Refactored neutron_utils#get_subnet_by_name() to get_subnet()
[snaps.git] / snaps / openstack / tests / create_security_group_tests.py
index 75c6387..a0392ea 100644 (file)
@@ -16,10 +16,9 @@ import unittest
 import uuid
 
 from snaps.openstack import create_security_group
-from snaps.openstack.create_security_group import (SecurityGroupSettings,
-                                                   SecurityGroupRuleSettings,
-                                                   Direction, Ethertype,
-                                                   Protocol)
+from snaps.openstack.create_security_group import (
+    SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
+    Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError)
 from snaps.openstack.tests import validation_utils
 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
 from snaps.openstack.utils import neutron_utils
@@ -33,19 +32,19 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
     """
 
     def test_no_params(self):
-        with self.assertRaises(Exception):
+        with self.assertRaises(SecurityGroupRuleSettingsError):
             SecurityGroupRuleSettings()
 
     def test_empty_config(self):
-        with self.assertRaises(Exception):
+        with self.assertRaises(SecurityGroupRuleSettingsError):
             SecurityGroupRuleSettings(**dict())
 
     def test_name_only(self):
-        with self.assertRaises(Exception):
+        with self.assertRaises(SecurityGroupRuleSettingsError):
             SecurityGroupRuleSettings(sec_grp_name='foo')
 
     def test_config_with_name_only(self):
-        with self.assertRaises(Exception):
+        with self.assertRaises(SecurityGroupRuleSettingsError):
             SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
 
     def test_name_and_direction(self):
@@ -105,11 +104,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
     """
 
     def test_no_params(self):
-        with self.assertRaises(Exception):
+        with self.assertRaises(SecurityGroupSettingsError):
             SecurityGroupSettings()
 
     def test_empty_config(self):
-        with self.assertRaises(Exception):
+        with self.assertRaises(SecurityGroupSettingsError):
             SecurityGroupSettings(**dict())
 
     def test_name_only(self):
@@ -121,17 +120,20 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
         self.assertEqual('foo', settings.name)
 
     def test_invalid_rule(self):
-        rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
-                                                 direction=Direction.ingress)
-        with self.assertRaises(Exception):
+        rule_setting = SecurityGroupRuleSettings(
+            sec_grp_name='bar', direction=Direction.ingress,
+            description='test_rule_1')
+        with self.assertRaises(SecurityGroupSettingsError):
             SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
 
     def test_all(self):
         rule_settings = list()
         rule_settings.append(SecurityGroupRuleSettings(
-            sec_grp_name='bar', direction=Direction.egress))
+            sec_grp_name='bar', direction=Direction.egress,
+            description='test_rule_1'))
         rule_settings.append(SecurityGroupRuleSettings(
-            sec_grp_name='bar', direction=Direction.ingress))
+            sec_grp_name='bar', direction=Direction.ingress,
+            description='test_rule_2'))
         settings = SecurityGroupSettings(
             name='bar', description='fubar', project_name='foo',
             rule_settings=rule_settings)
@@ -210,6 +212,69 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group()))
+
+    def test_create_group_admin_user_to_new_project(self):
+        """
+        Tests the creation of an OpenStack Security Group without custom rules.
+        """
+        # Create Image
+        sec_grp_settings = SecurityGroupSettings(
+            name=self.sec_grp_name, description='hello group',
+            project_name=self.admin_os_creds.project_name)
+        self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+            self.os_creds, sec_grp_settings)
+        self.sec_grp_creator.create()
+
+        sec_grp = neutron_utils.get_security_group(self.neutron,
+                                                   self.sec_grp_name)
+        self.assertIsNotNone(sec_grp)
+
+        validation_utils.objects_equivalent(
+            self.sec_grp_creator.get_security_group(), sec_grp)
+        rules = neutron_utils.get_rules_by_security_group(
+            self.neutron, self.sec_grp_creator.get_security_group())
+        self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+        validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+                                            rules)
+
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
+    def test_create_group_new_user_to_admin_project(self):
+        """
+        Tests the creation of an OpenStack Security Group without custom rules.
+        """
+        # Create Image
+        sec_grp_settings = SecurityGroupSettings(
+            name=self.sec_grp_name, description='hello group',
+            project_name=self.os_creds.project_name)
+        self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+            self.admin_os_creds, sec_grp_settings)
+        self.sec_grp_creator.create()
+
+        sec_grp = neutron_utils.get_security_group(self.neutron,
+                                                   self.sec_grp_name)
+        self.assertIsNotNone(sec_grp)
+
+        validation_utils.objects_equivalent(
+            self.sec_grp_creator.get_security_group(), sec_grp)
+        rules = neutron_utils.get_rules_by_security_group(
+            self.neutron, self.sec_grp_creator.get_security_group())
+        self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+        validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+                                            rules)
+
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
     def test_create_delete_group(self):
         """
         Tests the creation of an OpenStack Security Group without custom rules.
@@ -222,6 +287,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         created_sec_grp = self.sec_grp_creator.create()
         self.assertIsNotNone(created_sec_grp)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group()))
+
         neutron_utils.delete_security_group(self.neutron, created_sec_grp)
         self.assertIsNone(neutron_utils.get_security_group(
             self.neutron, self.sec_grp_creator.sec_grp_settings.name))
@@ -236,8 +306,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.ingress))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+                description='test_rule_1'))
         sec_grp_settings = SecurityGroupSettings(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
@@ -255,6 +326,46 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
+    def test_create_group_with_one_complex_rule(self):
+        """
+        Tests the creation of an OpenStack Security Group with one simple
+        custom rule.
+        """
+        # Create Image
+        sec_grp_rule_settings = list()
+        sec_grp_rule_settings.append(
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+                port_range_min=10, port_range_max=20,
+                description='test_rule_1'))
+        sec_grp_settings = SecurityGroupSettings(
+            name=self.sec_grp_name, description='hello group',
+            rule_settings=sec_grp_rule_settings)
+        self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+            self.os_creds, sec_grp_settings)
+        self.sec_grp_creator.create()
+
+        sec_grp = neutron_utils.get_security_group(self.neutron,
+                                                   self.sec_grp_name)
+        validation_utils.objects_equivalent(
+            self.sec_grp_creator.get_security_group(), sec_grp)
+        rules = neutron_utils.get_rules_by_security_group(
+            self.neutron, self.sec_grp_creator.get_security_group())
+        self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+        validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+                                            rules)
+
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
     def test_create_group_with_several_rules(self):
         """
         Tests the creation of an OpenStack Security Group with one simple
@@ -263,20 +374,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.ingress))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+                description='test_rule_1'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv6))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+                description='test_rule_2'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv4,
-                                      port_range_min=10,
-                                      port_range_max=20))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+                port_range_min=10, port_range_max=20,
+                description='test_rule_3'))
         sec_grp_settings = SecurityGroupSettings(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
@@ -294,6 +405,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
     def test_add_rule(self):
         """
         Tests the creation of an OpenStack Security Group with one simple
@@ -302,8 +418,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.ingress))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+                description='test_rule_1'))
         sec_grp_settings = SecurityGroupSettings(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
@@ -315,6 +432,15 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
                                                    self.sec_grp_name)
         validation_utils.objects_equivalent(
             self.sec_grp_creator.get_security_group(), sec_grp)
+
+        rules = neutron_utils.get_rules_by_security_group(
+            self.neutron, self.sec_grp_creator.get_security_group())
+
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
         rules = neutron_utils.get_rules_by_security_group(
             self.neutron, self.sec_grp_creator.get_security_group())
         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
@@ -323,7 +449,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
 
         self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
             sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
-            direction=Direction.egress, protocol=Protocol.icmp))
+            direction=Direction.egress, protocol=Protocol.icmp,
+            description='test_rule_2'))
         rules2 = neutron_utils.get_rules_by_security_group(
             self.neutron, self.sec_grp_creator.get_security_group())
         self.assertEqual(len(rules) + 1, len(rules2))
@@ -336,20 +463,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.ingress))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+                description='test_rule_1'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv6))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+                description='test_rule_2'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv4,
-                                      port_range_min=10,
-                                      port_range_max=20))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+                port_range_min=10, port_range_max=20,
+                description='test_rule_3'))
         sec_grp_settings = SecurityGroupSettings(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
@@ -367,6 +494,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
         self.sec_grp_creator.remove_rule(
             rule_id=rules[0].id)
         rules_after_del = neutron_utils.get_rules_by_security_group(
@@ -382,20 +514,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.ingress))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+                description='test_rule_1'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv6))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+                description='test_rule_2'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
-                                      direction=Direction.egress,
-                                      protocol=Protocol.udp,
-                                      ethertype=Ethertype.IPv4,
-                                      port_range_min=10,
-                                      port_range_max=20))
+            SecurityGroupRuleSettings(
+                sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+                protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+                port_range_min=10, port_range_max=20,
+                description='test_rule_3'))
         sec_grp_settings = SecurityGroupSettings(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
@@ -407,17 +539,86 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
                                                    self.sec_grp_name)
         validation_utils.objects_equivalent(
             self.sec_grp_creator.get_security_group(), sec_grp)
+
         rules = neutron_utils.get_rules_by_security_group(
             self.neutron, self.sec_grp_creator.get_security_group())
         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
+        self.assertTrue(
+            validate_sec_grp(
+                self.neutron, self.sec_grp_creator.sec_grp_settings,
+                self.sec_grp_creator.get_security_group(), rules))
+
         self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
         rules_after_del = neutron_utils.get_rules_by_security_group(
             self.neutron,
             self.sec_grp_creator.get_security_group())
         self.assertEqual(len(rules) - 1, len(rules_after_del))
 
-# TODO - Add more tests with different rules. Rule creation parameters can be
-# somewhat complex
+
+def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
+    """
+    Returns True is the settings on a security group are properly contained
+    on the SNAPS SecurityGroup domain object
+    :param neutron: the neutron client
+    :param sec_grp_settings: the security group configuration
+    :param sec_grp: the SNAPS-OO security group object
+    :param rules: collection of SNAPS-OO security group rule objects
+    :return: T/F
+    """
+    return (sec_grp.description == sec_grp_settings.description and
+            sec_grp.name == sec_grp_settings.name and
+            validate_sec_grp_rules(
+                neutron, sec_grp_settings.rule_settings, rules))
+
+
+def validate_sec_grp_rules(neutron, rule_settings, rules):
+    """
+    Returns True is the settings on a security group rule are properly
+    contained on the SNAPS SecurityGroupRule domain object.
+    This function will only operate on rules that contain a description as
+    this is the only means to tell if the rule is custom or defaulted by
+    OpenStack
+    :param neutron: the neutron client
+    :param rule_settings: collection of SecurityGroupRuleSettings objects
+    :param rules: a collection of SecurityGroupRule domain objects
+    :return: T/F
+    """
+
+    for rule_setting in rule_settings:
+        if rule_setting.description:
+            match = False
+            for rule in rules:
+                if rule_setting.protocol == Protocol.null:
+                    setting_proto = None
+                else:
+                    setting_proto = rule_setting.protocol.name
+
+                sec_grp = neutron_utils.get_security_group(
+                    neutron, rule_setting.sec_grp_name)
+
+                setting_eth_type = create_security_group.Ethertype.IPv4
+                if rule_setting.ethertype:
+                    setting_eth_type = rule_setting.ethertype
+
+                if not sec_grp:
+                    return False
+
+                if (rule.description == rule_setting.description and
+                    rule.direction == rule_setting.direction.name and
+                    rule.ethertype == setting_eth_type.name and
+                    rule.port_range_max == rule_setting.port_range_max and
+                    rule.port_range_min == rule_setting.port_range_min and
+                    rule.protocol == setting_proto and
+                    rule.remote_group_id == rule_setting.remote_group_id and
+                    rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
+                    rule.security_group_id == sec_grp.id):
+                    match = True
+                    break
+
+            if not match:
+                return False
+
+    return True