Updated comments for ID version 2 vs. 3
[snaps.git] / snaps / openstack / tests / create_security_group_tests.py
index 99ea53a..090d736 100644 (file)
 import unittest
 import uuid
 
+from snaps.config.security_group import (
+    SecurityGroupConfig,  SecurityGroupRuleConfig,
+    SecurityGroupRuleConfigError, SecurityGroupConfigError)
 from snaps.openstack import create_security_group
 from snaps.openstack.create_security_group import (
     SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
-    Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError)
+    Protocol)
 from snaps.openstack.tests import validation_utils
 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
 from snaps.openstack.utils import neutron_utils
@@ -32,32 +35,64 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
     """
 
     def test_no_params(self):
-        with self.assertRaises(SecurityGroupRuleSettingsError):
+        with self.assertRaises(SecurityGroupRuleConfigError):
             SecurityGroupRuleSettings()
 
     def test_empty_config(self):
-        with self.assertRaises(SecurityGroupRuleSettingsError):
+        with self.assertRaises(SecurityGroupRuleConfigError):
             SecurityGroupRuleSettings(**dict())
 
     def test_name_only(self):
-        with self.assertRaises(SecurityGroupRuleSettingsError):
+        with self.assertRaises(SecurityGroupRuleConfigError):
             SecurityGroupRuleSettings(sec_grp_name='foo')
 
     def test_config_with_name_only(self):
-        with self.assertRaises(SecurityGroupRuleSettingsError):
+        with self.assertRaises(SecurityGroupRuleConfigError):
             SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
 
     def test_name_and_direction(self):
         settings = SecurityGroupRuleSettings(sec_grp_name='foo',
                                              direction=Direction.ingress)
         self.assertEqual('foo', settings.sec_grp_name)
-        self.assertEqual(Direction.ingress, settings.direction)
+        self.assertEqual(Direction.ingress.value, settings.direction.value)
 
     def test_config_name_and_direction(self):
         settings = SecurityGroupRuleSettings(
             **{'sec_grp_name': 'foo', 'direction': 'ingress'})
         self.assertEqual('foo', settings.sec_grp_name)
-        self.assertEqual(Direction.ingress, settings.direction)
+        self.assertEqual(Direction.ingress.value, settings.direction.value)
+
+    def test_proto_ah_str(self):
+        settings = SecurityGroupRuleSettings(
+            **{'sec_grp_name': 'foo', 'direction': 'ingress',
+               'protocol': 'ah'})
+        self.assertEqual('foo', settings.sec_grp_name)
+        self.assertEqual(Direction.ingress.value, settings.direction.value)
+        self.assertEqual(Protocol.ah.value, settings.protocol.value)
+
+    def test_proto_ah_value(self):
+        settings = SecurityGroupRuleSettings(
+            **{'sec_grp_name': 'foo', 'direction': 'ingress',
+               'protocol': 51})
+        self.assertEqual('foo', settings.sec_grp_name)
+        self.assertEqual(Direction.ingress.value, settings.direction.value)
+        self.assertEqual(Protocol.ah.value, settings.protocol.value)
+
+    def test_proto_any(self):
+        settings = SecurityGroupRuleSettings(
+            **{'sec_grp_name': 'foo', 'direction': 'ingress',
+               'protocol': 'any'})
+        self.assertEqual('foo', settings.sec_grp_name)
+        self.assertEqual(Direction.ingress.value, settings.direction.value)
+        self.assertEqual(Protocol.null.value, settings.protocol.value)
+
+    def test_proto_null(self):
+        settings = SecurityGroupRuleSettings(
+            **{'sec_grp_name': 'foo', 'direction': 'ingress',
+               'protocol': 'null'})
+        self.assertEqual('foo', settings.sec_grp_name)
+        self.assertEqual(Direction.ingress.value, settings.direction.value)
+        self.assertEqual(Protocol.null.value, settings.protocol.value)
 
     def test_all(self):
         settings = SecurityGroupRuleSettings(
@@ -68,10 +103,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
             remote_ip_prefix='prfx')
         self.assertEqual('foo', settings.sec_grp_name)
         self.assertEqual('fubar', settings.description)
-        self.assertEqual(Direction.egress, settings.direction)
+        self.assertEqual(Direction.egress.value, settings.direction.value)
         self.assertEqual('rgi', settings.remote_group_id)
-        self.assertEqual(Protocol.icmp, settings.protocol)
-        self.assertEqual(Ethertype.IPv6, settings.ethertype)
+        self.assertEqual(Protocol.icmp.value, settings.protocol.value)
+        self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
         self.assertEqual(1, settings.port_range_min)
         self.assertEqual(2, settings.port_range_max)
         self.assertEqual('prfx', settings.remote_ip_prefix)
@@ -89,10 +124,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
                'remote_ip_prefix': 'prfx'})
         self.assertEqual('foo', settings.sec_grp_name)
         self.assertEqual('fubar', settings.description)
-        self.assertEqual(Direction.egress, settings.direction)
+        self.assertEqual(Direction.egress.value, settings.direction.value)
         self.assertEqual('rgi', settings.remote_group_id)
-        self.assertEqual(Protocol.tcp, settings.protocol)
-        self.assertEqual(Ethertype.IPv6, settings.ethertype)
+        self.assertEqual(Protocol.tcp.value, settings.protocol.value)
+        self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
         self.assertEqual(1, settings.port_range_min)
         self.assertEqual(2, settings.port_range_max)
         self.assertEqual('prfx', settings.remote_ip_prefix)
@@ -104,11 +139,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
     """
 
     def test_no_params(self):
-        with self.assertRaises(SecurityGroupSettingsError):
+        with self.assertRaises(SecurityGroupConfigError):
             SecurityGroupSettings()
 
     def test_empty_config(self):
-        with self.assertRaises(SecurityGroupSettingsError):
+        with self.assertRaises(SecurityGroupConfigError):
             SecurityGroupSettings(**dict())
 
     def test_name_only(self):
@@ -123,7 +158,7 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
         rule_setting = SecurityGroupRuleSettings(
             sec_grp_name='bar', direction=Direction.ingress,
             description='test_rule_1')
-        with self.assertRaises(SecurityGroupSettingsError):
+        with self.assertRaises(SecurityGroupConfigError):
             SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
 
     def test_all(self):
@@ -157,8 +192,8 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
         self.assertEqual('foo', settings.project_name)
         self.assertEqual(1, len(settings.rule_settings))
         self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
-        self.assertEqual(Direction.ingress,
-                         settings.rule_settings[0].direction)
+        self.assertEqual(Direction.ingress.value,
+                         settings.rule_settings[0].direction.value)
 
 
 class CreateSecurityGroupTests(OSIntegrationTestCase):
@@ -194,8 +229,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         Tests the creation of an OpenStack Security Group without custom rules.
         """
         # Create Image
-        sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
-                                                 description='hello group')
+        sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
+                                               description='hello group')
         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
             self.os_creds, sec_grp_settings)
         self.sec_grp_creator.create()
@@ -222,7 +257,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         Tests the creation of an OpenStack Security Group without custom rules.
         """
         # Create Image
-        sec_grp_settings = SecurityGroupSettings(
+        sec_grp_settings = SecurityGroupConfig(
             name=self.sec_grp_name, description='hello group',
             project_name=self.admin_os_creds.project_name)
         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -251,7 +286,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         Tests the creation of an OpenStack Security Group without custom rules.
         """
         # Create Image
-        sec_grp_settings = SecurityGroupSettings(
+        sec_grp_settings = SecurityGroupConfig(
             name=self.sec_grp_name, description='hello group',
             project_name=self.os_creds.project_name)
         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -280,8 +315,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         Tests the creation of an OpenStack Security Group without custom rules.
         """
         # Create Image
-        sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
-                                                 description='hello group')
+        sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
+                                               description='hello group')
         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
             self.os_creds, sec_grp_settings)
         created_sec_grp = self.sec_grp_creator.create()
@@ -307,10 +342,10 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
                 description='test_rule_1'))
-        sec_grp_settings = SecurityGroupSettings(
+        sec_grp_settings = SecurityGroupConfig(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -340,12 +375,12 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
                 port_range_min=10, port_range_max=20,
                 description='test_rule_1'))
-        sec_grp_settings = SecurityGroupSettings(
+        sec_grp_settings = SecurityGroupConfig(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -375,21 +410,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
                 description='test_rule_1'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
                 description='test_rule_2'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
                 port_range_min=10, port_range_max=20,
                 description='test_rule_3'))
-        sec_grp_settings = SecurityGroupSettings(
+        sec_grp_settings = SecurityGroupConfig(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -419,10 +454,10 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
                 description='test_rule_1'))
-        sec_grp_settings = SecurityGroupSettings(
+        sec_grp_settings = SecurityGroupConfig(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -448,7 +483,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
                                             rules)
 
-        self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
+        self.sec_grp_creator.add_rule(SecurityGroupRuleConfig(
             sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
             direction=Direction.egress, protocol=Protocol.icmp,
             description='test_rule_2'))
@@ -464,21 +499,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
                 description='test_rule_1'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
                 description='test_rule_2'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
                 port_range_min=10, port_range_max=20,
                 description='test_rule_3'))
-        sec_grp_settings = SecurityGroupSettings(
+        sec_grp_settings = SecurityGroupConfig(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -515,21 +550,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
         # Create Image
         sec_grp_rule_settings = list()
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
                 description='test_rule_1'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
                 description='test_rule_2'))
         sec_grp_rule_settings.append(
-            SecurityGroupRuleSettings(
+            SecurityGroupRuleConfig(
                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
                 port_range_min=10, port_range_max=20,
                 description='test_rule_3'))
-        sec_grp_settings = SecurityGroupSettings(
+        sec_grp_settings = SecurityGroupConfig(
             name=self.sec_grp_name, description='hello group',
             rule_settings=sec_grp_rule_settings)
         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -583,7 +618,7 @@ def validate_sec_grp_rules(neutron, rule_settings, rules):
     this is the only means to tell if the rule is custom or defaulted by
     OpenStack
     :param neutron: the neutron client
-    :param rule_settings: collection of SecurityGroupRuleSettings objects
+    :param rule_settings: collection of SecurityGroupRuleConfig objects
     :param rules: a collection of SecurityGroupRule domain objects
     :return: T/F
     """
@@ -592,11 +627,6 @@ def validate_sec_grp_rules(neutron, rule_settings, rules):
         if rule_setting.description:
             match = False
             for rule in rules:
-                if rule_setting.protocol == Protocol.null:
-                    setting_proto = None
-                else:
-                    setting_proto = rule_setting.protocol.name
-
                 sec_grp = neutron_utils.get_security_group(
                     neutron, sec_grp_name=rule_setting.sec_grp_name)
 
@@ -607,15 +637,19 @@ def validate_sec_grp_rules(neutron, rule_settings, rules):
                 if not sec_grp:
                     return False
 
+                proto_str = 'null'
+                if rule.protocol:
+                    proto_str = rule.protocol
+
                 if (rule.description == rule_setting.description and
                     rule.direction == rule_setting.direction.name and
                     rule.ethertype == setting_eth_type.name and
                     rule.port_range_max == rule_setting.port_range_max and
                     rule.port_range_min == rule_setting.port_range_min and
-                    rule.protocol == setting_proto and
+                    proto_str == str(rule_setting.protocol.value) and
                     rule.remote_group_id == rule_setting.remote_group_id and
                     rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
-                    rule.security_group_id == sec_grp.id):
+                        rule.security_group_id == sec_grp.id):
                     match = True
                     break