X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=snaps%2Fopenstack%2Ftests%2Fcreate_security_group_tests.py;h=090d7361c8190f741157dee74209baab898836f0;hb=8d64362faf1998bceae353da3d29883883a6c351;hp=dd28d7dc752c6d111fc9281e8ee64dd20282bc81;hpb=feae63d11f8295a0d9327496f42949ad5b67fca4;p=snaps.git diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py index dd28d7d..090d736 100644 --- a/snaps/openstack/tests/create_security_group_tests.py +++ b/snaps/openstack/tests/create_security_group_tests.py @@ -15,13 +15,13 @@ import unittest import uuid +from snaps.config.security_group import ( + SecurityGroupConfig, SecurityGroupRuleConfig, + SecurityGroupRuleConfigError, SecurityGroupConfigError) from snaps.openstack import create_security_group -from snaps.openstack.create_security_group import (SecurityGroupSettings, - SecurityGroupRuleSettings, - Direction, Ethertype, - Protocol, - SecurityGroupRuleSettingsError, - SecurityGroupSettingsError) +from snaps.openstack.create_security_group import ( + SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype, + Protocol) from snaps.openstack.tests import validation_utils from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase from snaps.openstack.utils import neutron_utils @@ -35,32 +35,64 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): """ def test_no_params(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings() def test_empty_config(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(**dict()) def test_name_only(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(sec_grp_name='foo') def test_config_with_name_only(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'}) def test_name_and_direction(self): settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Direction.ingress.value, settings.direction.value) def test_config_name_and_direction(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress'}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Direction.ingress.value, settings.direction.value) + + def test_proto_ah_str(self): + settings = SecurityGroupRuleSettings( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 'ah'}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.ah.value, settings.protocol.value) + + def test_proto_ah_value(self): + settings = SecurityGroupRuleSettings( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 51}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.ah.value, settings.protocol.value) + + def test_proto_any(self): + settings = SecurityGroupRuleSettings( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 'any'}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.null.value, settings.protocol.value) + + def test_proto_null(self): + settings = SecurityGroupRuleSettings( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 'null'}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.null.value, settings.protocol.value) def test_all(self): settings = SecurityGroupRuleSettings( @@ -71,10 +103,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): remote_ip_prefix='prfx') self.assertEqual('foo', settings.sec_grp_name) self.assertEqual('fubar', settings.description) - self.assertEqual(Direction.egress, settings.direction) + self.assertEqual(Direction.egress.value, settings.direction.value) self.assertEqual('rgi', settings.remote_group_id) - self.assertEqual(Protocol.icmp, settings.protocol) - self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(Protocol.icmp.value, settings.protocol.value) + self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value) self.assertEqual(1, settings.port_range_min) self.assertEqual(2, settings.port_range_max) self.assertEqual('prfx', settings.remote_ip_prefix) @@ -92,10 +124,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): 'remote_ip_prefix': 'prfx'}) self.assertEqual('foo', settings.sec_grp_name) self.assertEqual('fubar', settings.description) - self.assertEqual(Direction.egress, settings.direction) + self.assertEqual(Direction.egress.value, settings.direction.value) self.assertEqual('rgi', settings.remote_group_id) - self.assertEqual(Protocol.tcp, settings.protocol) - self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(Protocol.tcp.value, settings.protocol.value) + self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value) self.assertEqual(1, settings.port_range_min) self.assertEqual(2, settings.port_range_max) self.assertEqual('prfx', settings.remote_ip_prefix) @@ -107,11 +139,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): """ def test_no_params(self): - with self.assertRaises(SecurityGroupSettingsError): + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings() def test_empty_config(self): - with self.assertRaises(SecurityGroupSettingsError): + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings(**dict()) def test_name_only(self): @@ -123,17 +155,20 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): self.assertEqual('foo', settings.name) def test_invalid_rule(self): - rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', - direction=Direction.ingress) - with self.assertRaises(SecurityGroupSettingsError): + rule_setting = SecurityGroupRuleSettings( + sec_grp_name='bar', direction=Direction.ingress, + description='test_rule_1') + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings(name='foo', rule_settings=[rule_setting]) def test_all(self): rule_settings = list() rule_settings.append(SecurityGroupRuleSettings( - sec_grp_name='bar', direction=Direction.egress)) + sec_grp_name='bar', direction=Direction.egress, + description='test_rule_1')) rule_settings.append(SecurityGroupRuleSettings( - sec_grp_name='bar', direction=Direction.ingress)) + sec_grp_name='bar', direction=Direction.ingress, + description='test_rule_2')) settings = SecurityGroupSettings( name='bar', description='fubar', project_name='foo', rule_settings=rule_settings) @@ -157,8 +192,8 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): self.assertEqual('foo', settings.project_name) self.assertEqual(1, len(settings.rule_settings)) self.assertEqual('bar', settings.rule_settings[0].sec_grp_name) - self.assertEqual(Direction.ingress, - settings.rule_settings[0].direction) + self.assertEqual(Direction.ingress.value, + settings.rule_settings[0].direction.value) class CreateSecurityGroupTests(OSIntegrationTestCase): @@ -194,14 +229,72 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, - description='hello group') + sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name, + description='hello group') self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, - self.sec_grp_name) + sec_grp = neutron_utils.get_security_group( + self.neutron, sec_grp_settings=sec_grp_settings) + self.assertIsNotNone(sec_grp) + + validation_utils.objects_equivalent( + self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) + self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), + rules) + + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group())) + + def test_create_group_admin_user_to_new_project(self): + """ + Tests the creation of an OpenStack Security Group without custom rules. + """ + # Create Image + sec_grp_settings = SecurityGroupConfig( + name=self.sec_grp_name, description='hello group', + project_name=self.admin_os_creds.project_name) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( + self.os_creds, sec_grp_settings) + self.sec_grp_creator.create() + + sec_grp = neutron_utils.get_security_group( + self.neutron, sec_grp_settings=sec_grp_settings) + self.assertIsNotNone(sec_grp) + + validation_utils.objects_equivalent( + self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) + self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), + rules) + + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + + def test_create_group_new_user_to_admin_project(self): + """ + Tests the creation of an OpenStack Security Group without custom rules. + """ + # Create Image + sec_grp_settings = SecurityGroupConfig( + name=self.sec_grp_name, description='hello group', + project_name=self.os_creds.project_name) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( + self.admin_os_creds, sec_grp_settings) + self.sec_grp_creator.create() + + sec_grp = neutron_utils.get_security_group( + self.neutron, sec_grp_settings=sec_grp_settings) self.assertIsNotNone(sec_grp) validation_utils.objects_equivalent( @@ -212,21 +305,32 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + def test_create_delete_group(self): """ Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, - description='hello group') + sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name, + description='hello group') self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) created_sec_grp = self.sec_grp_creator.create() self.assertIsNotNone(created_sec_grp) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group())) + neutron_utils.delete_security_group(self.neutron, created_sec_grp) self.assertIsNone(neutron_utils.get_security_group( - self.neutron, self.sec_grp_creator.sec_grp_settings.name)) + self.neutron, + sec_grp_settings=self.sec_grp_creator.sec_grp_settings)) self.sec_grp_creator.clean() @@ -238,17 +342,18 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) - sec_grp_settings = SecurityGroupSettings( + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress, + description='test_rule_1')) + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, - self.sec_grp_name) + sec_grp = neutron_utils.get_security_group( + self.neutron, sec_grp_settings=sec_grp_settings) validation_utils.objects_equivalent( self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group( @@ -257,6 +362,46 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + + def test_create_group_with_one_complex_rule(self): + """ + Tests the creation of an OpenStack Security Group with one simple + custom rule. + """ + # Create Image + sec_grp_rule_settings = list() + sec_grp_rule_settings.append( + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv4, + port_range_min=10, port_range_max=20, + description='test_rule_1')) + sec_grp_settings = SecurityGroupConfig( + name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( + self.os_creds, sec_grp_settings) + self.sec_grp_creator.create() + + sec_grp = neutron_utils.get_security_group( + self.neutron, sec_grp_settings=sec_grp_settings) + validation_utils.objects_equivalent( + self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) + self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), + rules) + + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + def test_create_group_with_several_rules(self): """ Tests the creation of an OpenStack Security Group with one simple @@ -265,29 +410,29 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress, + description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv6)) + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv6, + description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv4, - port_range_min=10, - port_range_max=20)) - sec_grp_settings = SecurityGroupSettings( + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv4, + port_range_min=10, port_range_max=20, + description='test_rule_3')) + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, - self.sec_grp_name) + sec_grp = neutron_utils.get_security_group( + self.neutron, sec_grp_settings=sec_grp_settings) validation_utils.objects_equivalent( self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group( @@ -296,6 +441,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + def test_add_rule(self): """ Tests the creation of an OpenStack Security Group with one simple @@ -304,28 +454,39 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) - sec_grp_settings = SecurityGroupSettings( + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress, + description='test_rule_1')) + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, - self.sec_grp_name) + sec_grp = neutron_utils.get_security_group( + self.neutron, sec_grp_settings=sec_grp_settings) validation_utils.objects_equivalent( self.sec_grp_creator.get_security_group(), sec_grp) + + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) + + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + rules = neutron_utils.get_rules_by_security_group( self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) - self.sec_grp_creator.add_rule(SecurityGroupRuleSettings( + self.sec_grp_creator.add_rule(SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_creator.sec_grp_settings.name, - direction=Direction.egress, protocol=Protocol.icmp)) + direction=Direction.egress, protocol=Protocol.icmp, + description='test_rule_2')) rules2 = neutron_utils.get_rules_by_security_group( self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(rules) + 1, len(rules2)) @@ -338,29 +499,29 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress, + description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv6)) + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv6, + description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv4, - port_range_min=10, - port_range_max=20)) - sec_grp_settings = SecurityGroupSettings( + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv4, + port_range_min=10, port_range_max=20, + description='test_rule_3')) + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, - self.sec_grp_name) + sec_grp = neutron_utils.get_security_group( + self.neutron, sec_grp_settings=sec_grp_settings) validation_utils.objects_equivalent( self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group( @@ -369,6 +530,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + self.sec_grp_creator.remove_rule( rule_id=rules[0].id) rules_after_del = neutron_utils.get_rules_by_security_group( @@ -384,42 +550,110 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress, + description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv6)) + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv6, + description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv4, - port_range_min=10, - port_range_max=20)) - sec_grp_settings = SecurityGroupSettings( + SecurityGroupRuleConfig( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv4, + port_range_min=10, port_range_max=20, + description='test_rule_3')) + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, - self.sec_grp_name) + sec_grp = neutron_utils.get_security_group( + self.neutron, sec_grp_settings=sec_grp_settings) validation_utils.objects_equivalent( self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0]) rules_after_del = neutron_utils.get_rules_by_security_group( self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(rules) - 1, len(rules_after_del)) -# TODO - Add more tests with different rules. Rule creation parameters can be -# somewhat complex + +def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()): + """ + Returns True is the settings on a security group are properly contained + on the SNAPS SecurityGroup domain object + :param neutron: the neutron client + :param sec_grp_settings: the security group configuration + :param sec_grp: the SNAPS-OO security group object + :param rules: collection of SNAPS-OO security group rule objects + :return: T/F + """ + return (sec_grp.description == sec_grp_settings.description and + sec_grp.name == sec_grp_settings.name and + validate_sec_grp_rules( + neutron, sec_grp_settings.rule_settings, rules)) + + +def validate_sec_grp_rules(neutron, rule_settings, rules): + """ + Returns True is the settings on a security group rule are properly + contained on the SNAPS SecurityGroupRule domain object. + This function will only operate on rules that contain a description as + this is the only means to tell if the rule is custom or defaulted by + OpenStack + :param neutron: the neutron client + :param rule_settings: collection of SecurityGroupRuleConfig objects + :param rules: a collection of SecurityGroupRule domain objects + :return: T/F + """ + + for rule_setting in rule_settings: + if rule_setting.description: + match = False + for rule in rules: + sec_grp = neutron_utils.get_security_group( + neutron, sec_grp_name=rule_setting.sec_grp_name) + + setting_eth_type = create_security_group.Ethertype.IPv4 + if rule_setting.ethertype: + setting_eth_type = rule_setting.ethertype + + if not sec_grp: + return False + + proto_str = 'null' + if rule.protocol: + proto_str = rule.protocol + + if (rule.description == rule_setting.description and + rule.direction == rule_setting.direction.name and + rule.ethertype == setting_eth_type.name and + rule.port_range_max == rule_setting.port_range_max and + rule.port_range_min == rule_setting.port_range_min and + proto_str == str(rule_setting.protocol.value) and + rule.remote_group_id == rule_setting.remote_group_id and + rule.remote_ip_prefix == rule_setting.remote_ip_prefix and + rule.security_group_id == sec_grp.id): + match = True + break + + if not match: + return False + + return True