Expand OpenStackSecurityGroup class tests. 25/38225/1
authorspisarski <s.pisarski@cablelabs.com>
Wed, 26 Jul 2017 21:12:16 +0000 (15:12 -0600)
committerspisarski <s.pisarski@cablelabs.com>
Wed, 26 Jul 2017 21:12:16 +0000 (15:12 -0600)
Improved validation and added another test case.

JIRA: SNAPS-148

Change-Id: I249ac0fcce502dd91c82cc2bfb54ca22da5e33bb
Signed-off-by: spisarski <s.pisarski@cablelabs.com>
docs/how-to-use/IntegrationTests.rst
snaps/domain/network.py
snaps/domain/test/network_tests.py
snaps/openstack/tests/create_security_group_tests.py
snaps/openstack/utils/neutron_utils.py

index 70e51b9..92a5301 100644 (file)
@@ -27,7 +27,10 @@ create_security_group_tests.py - CreateSecurityGroupTests
 |                                       |               | some other process                                        |
 +---------------------------------------+---------------+-----------------------------------------------------------+
 | test_create_group_with_one_simple_rule| Keysone 2 & 3 | Ensures the OpenStackSecurityGroup class can create a     |
-|                                       | Neutron 2     | security group with a single rule                         |
+|                                       | Neutron 2     | security group with a single simple rule                  |
++---------------------------------------+---------------+-----------------------------------------------------------+
+| test_create_group_with_one_complex    | Keysone 2 & 3 | Ensures the OpenStackSecurityGroup class can create a     |
+| _rule                                 | Neutron 2     | security group with a single complex rule                 |
 +---------------------------------------+---------------+-----------------------------------------------------------+
 | test_create_group_with_several_rules  | Keysone 2 & 3 | Ensures the OpenStackSecurityGroup class can create a     |
 |                                       | Neutron 2     | security group with several rules                         |
@@ -243,7 +246,7 @@ create_stack_tests.py - CreateStackSuccessTests
 +---------------------------------------+---------------+-----------------------------------------------------------+
 
 create_stack_tests.py - CreateStackNegativeTests
---------------------------------------------------
+------------------------------------------------
 
 +----------------------------------------+---------------+-----------------------------------------------------------+
 | Test Name                              | Neutron API   | Description                                               |
index 68889f1..2c71db8 100644 (file)
@@ -130,6 +130,7 @@ class SecurityGroup:
         """
         self.name = kwargs.get('name')
         self.id = kwargs.get('id')
+        self.description = kwargs.get('description')
         self.project_id = kwargs.get('project_id', kwargs.get('tenant_id'))
 
     def __eq__(self, other):
index 592090b..4fd20d4 100644 (file)
@@ -133,10 +133,11 @@ class SecurityGroupDomainObjectTests(unittest.TestCase):
 
     def test_construction_proj_id_kwargs(self):
         sec_grp = SecurityGroup(
-            **{'name': 'name', 'id': 'id',
-               'project_id': 'foo'})
+            **{'name': 'name', 'id': 'id', 'project_id': 'foo',
+               'description': 'test desc'})
         self.assertEqual('name', sec_grp.name)
         self.assertEqual('id', sec_grp.id)
+        self.assertEqual('test desc', sec_grp.description)
         self.assertEqual('foo', sec_grp.project_id)
 
     def test_construction_tenant_id_kwargs(self):
@@ -146,11 +147,14 @@ class SecurityGroupDomainObjectTests(unittest.TestCase):
         self.assertEqual('name', sec_grp.name)
         self.assertEqual('id', sec_grp.id)
         self.assertEqual('foo', sec_grp.project_id)
+        self.assertIsNone(sec_grp.description)
 
     def test_construction_named(self):
-        sec_grp = SecurityGroup(tenant_id='foo', id='id', name='name')
+        sec_grp = SecurityGroup(description='test desc', tenant_id='foo',
+                                id='id', name='name')
         self.assertEqual('name', sec_grp.name)
         self.assertEqual('id', sec_grp.id)
+        self.assertEqual('test desc', sec_grp.description)
         self.assertEqual('foo', sec_grp.project_id)
 
 
index 7cae62b..a0392ea 100644 (file)
@@ -120,17 +120,20 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
         self.assertEqual('foo', settings.name)
 
     def test_invalid_rule(self):
-        rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
-                                                 direction=Direction.ingress)
+        rule_setting = SecurityGroupRuleSettings(
+            sec_grp_name='bar', direction=Direction.ingress,
+            description='test_rule_1')
         with self.assertRaises(SecurityGroupSettingsError):
             SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
 
     def test_all(self):
         rule_settings = list()
         rule_settings.append(SecurityGroupRuleSettings(
-            sec_grp_name='bar', direction=Direction.egress))
+            sec_grp_name='bar', direction=Direction.egress,
+            description='test_rule_1'))
         rule_settings.append(SecurityGroupRuleSettings(
-            sec_grp_name='bar', direction=Direction.ingress))
+            sec_grp_name='bar', direction=Direction.ingress,
+            description='test_rule_2'))
         settings = SecurityGroupSettings(
             name='bar', description='fubar', project_name='foo',
             rule_settings=rule_settings)
@@ -209,6 +212,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group()))
+
     def test_create_group_admin_user_to_new_project(self):
         """
         Tests the creation of an OpenStack Security Group without custom rules.
@@ -233,6 +241,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
     def test_create_group_new_user_to_admin_project(self):
         """
         Tests the creation of an OpenStack Security Group without custom rules.
@@ -257,6 +270,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
     def test_create_delete_group(self):
         """
         Tests the creation of an OpenStack Security Group without custom rules.
@@ -269,6 +287,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         created_sec_grp = self.sec_grp_creator.create()
         self.assertIsNotNone(created_sec_grp)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group()))
+
         neutron_utils.delete_security_group(self.neutron, created_sec_grp)
         self.assertIsNone(neutron_utils.get_security_group(
             self.neutron, self.sec_grp_creator.sec_grp_settings.name))
@@ -283,8 +306,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.ingress))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+                description='test_rule_1'))
         sec_grp_settings = SecurityGroupSettings(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
@@ -302,6 +326,46 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
+    def test_create_group_with_one_complex_rule(self):
+        """
+        Tests the creation of an OpenStack Security Group with one simple
+        custom rule.
+        """
+        # Create Image
+        sec_grp_rule_settings = list()
+        sec_grp_rule_settings.append(
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+                port_range_min=10, port_range_max=20,
+                description='test_rule_1'))
+        sec_grp_settings = SecurityGroupSettings(
+            name=self.sec_grp_name, description='hello group',
+            rule_settings=sec_grp_rule_settings)
+        self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+            self.os_creds, sec_grp_settings)
+        self.sec_grp_creator.create()
+
+        sec_grp = neutron_utils.get_security_group(self.neutron,
+                                                   self.sec_grp_name)
+        validation_utils.objects_equivalent(
+            self.sec_grp_creator.get_security_group(), sec_grp)
+        rules = neutron_utils.get_rules_by_security_group(
+            self.neutron, self.sec_grp_creator.get_security_group())
+        self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+        validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+                                            rules)
+
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
     def test_create_group_with_several_rules(self):
         """
         Tests the creation of an OpenStack Security Group with one simple
@@ -310,20 +374,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.ingress))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+                description='test_rule_1'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv6))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+                description='test_rule_2'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv4,
-                                      port_range_min=10,
-                                      port_range_max=20))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+                port_range_min=10, port_range_max=20,
+                description='test_rule_3'))
         sec_grp_settings = SecurityGroupSettings(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
@@ -341,6 +405,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
     def test_add_rule(self):
         """
         Tests the creation of an OpenStack Security Group with one simple
@@ -349,8 +418,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.ingress))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+                description='test_rule_1'))
         sec_grp_settings = SecurityGroupSettings(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
@@ -362,6 +432,15 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
                                                    self.sec_grp_name)
         validation_utils.objects_equivalent(
             self.sec_grp_creator.get_security_group(), sec_grp)
+
+        rules = neutron_utils.get_rules_by_security_group(
+            self.neutron, self.sec_grp_creator.get_security_group())
+
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
         rules = neutron_utils.get_rules_by_security_group(
             self.neutron, self.sec_grp_creator.get_security_group())
         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
@@ -370,7 +449,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
 
         self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
             sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
-            direction=Direction.egress, protocol=Protocol.icmp))
+            direction=Direction.egress, protocol=Protocol.icmp,
+            description='test_rule_2'))
         rules2 = neutron_utils.get_rules_by_security_group(
             self.neutron, self.sec_grp_creator.get_security_group())
         self.assertEqual(len(rules) + 1, len(rules2))
@@ -383,20 +463,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.ingress))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+                description='test_rule_1'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv6))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+                description='test_rule_2'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv4,
-                                      port_range_min=10,
-                                      port_range_max=20))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+                port_range_min=10, port_range_max=20,
+                description='test_rule_3'))
         sec_grp_settings = SecurityGroupSettings(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
@@ -414,6 +494,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
         self.sec_grp_creator.remove_rule(
             rule_id=rules[0].id)
         rules_after_del = neutron_utils.get_rules_by_security_group(
@@ -429,20 +514,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.ingress))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+                description='test_rule_1'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv6))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+                description='test_rule_2'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv4,
-                                      port_range_min=10,
-                                      port_range_max=20))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+                port_range_min=10, port_range_max=20,
+                description='test_rule_3'))
         sec_grp_settings = SecurityGroupSettings(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
@@ -454,17 +539,86 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
                                                    self.sec_grp_name)
         validation_utils.objects_equivalent(
             self.sec_grp_creator.get_security_group(), sec_grp)
+
         rules = neutron_utils.get_rules_by_security_group(
             self.neutron, self.sec_grp_creator.get_security_group())
         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
         self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
         rules_after_del = neutron_utils.get_rules_by_security_group(
             self.neutron,
             self.sec_grp_creator.get_security_group())
         self.assertEqual(len(rules) - 1, len(rules_after_del))
 
-# TODO - Add more tests with different rules. Rule creation parameters can be
-# somewhat complex
+
+def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
+    """
+    Returns True is the settings on a security group are properly contained
+    on the SNAPS SecurityGroup domain object
+    :param neutron: the neutron client
+    :param sec_grp_settings: the security group configuration
+    :param sec_grp: the SNAPS-OO security group object
+    :param rules: collection of SNAPS-OO security group rule objects
+    :return: T/F
+    """
+    return (sec_grp.description == sec_grp_settings.description and
+            sec_grp.name == sec_grp_settings.name and
+            validate_sec_grp_rules(
+                neutron, sec_grp_settings.rule_settings, rules))
+
+
+def validate_sec_grp_rules(neutron, rule_settings, rules):
+    """
+    Returns True is the settings on a security group rule are properly
+    contained on the SNAPS SecurityGroupRule domain object.
+    This function will only operate on rules that contain a description as
+    this is the only means to tell if the rule is custom or defaulted by
+    OpenStack
+    :param neutron: the neutron client
+    :param rule_settings: collection of SecurityGroupRuleSettings objects
+    :param rules: a collection of SecurityGroupRule domain objects
+    :return: T/F
+    """
+
+    for rule_setting in rule_settings:
+        if rule_setting.description:
+            match = False
+            for rule in rules:
+                if rule_setting.protocol == Protocol.null:
+                    setting_proto = None
+                else:
+                    setting_proto = rule_setting.protocol.name
+
+                sec_grp = neutron_utils.get_security_group(
+                    neutron, rule_setting.sec_grp_name)
+
+                setting_eth_type = create_security_group.Ethertype.IPv4
+                if rule_setting.ethertype:
+                    setting_eth_type = rule_setting.ethertype
+
+                if not sec_grp:
+                    return False
+
+                if (rule.description == rule_setting.description and
+                    rule.direction == rule_setting.direction.name and
+                    rule.ethertype == setting_eth_type.name and
+                    rule.port_range_max == rule_setting.port_range_max and
+                    rule.port_range_min == rule_setting.port_range_min and
+                    rule.protocol == setting_proto and
+                    rule.remote_group_id == rule_setting.remote_group_id and
+                    rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
+                    rule.security_group_id == sec_grp.id):
+                    match = True
+                    break
+
+            if not match:
+                return False
+
+    return True
index bf8cb08..2f8ef7e 100644 (file)
@@ -331,11 +331,7 @@ def create_security_group(neutron, keystone, sec_grp_settings):
                 sec_grp_settings.name)
     os_group = neutron.create_security_group(
         sec_grp_settings.dict_for_neutron(keystone))
-    return SecurityGroup(
-        id=os_group['security_group']['id'],
-        name=os_group['security_group']['name'],
-        project_id=os_group['security_group'].get(
-            'project_id', os_group['security_group'].get('tenant_id')))
+    return SecurityGroup(**os_group['security_group'])
 
 
 def delete_security_group(neutron, sec_grp):
@@ -360,9 +356,7 @@ def get_security_group(neutron, name):
     groups = neutron.list_security_groups(**{'name': name})
     for group in groups['security_groups']:
         if group['name'] == name:
-            return SecurityGroup(
-                id=group['id'], name=group['name'],
-                project_id=group.get('project_id', group.get('tenant_id')))
+            return SecurityGroup(**group)
     return None
 
 
@@ -378,9 +372,7 @@ def get_security_group_by_id(neutron, sec_grp_id):
     groups = neutron.list_security_groups(**{'id': sec_grp_id})
     for group in groups['security_groups']:
         if group['id'] == sec_grp_id:
-            return SecurityGroup(
-                id=group['id'], name=group['name'],
-                project_id=group.get('project_id', group.get('tenant_id')))
+            return SecurityGroup(**group)
     return None