Supporting the protocol string value of 'any' for security group rules. 69/47369/1
authorspisarski <s.pisarski@cablelabs.com>
Thu, 16 Nov 2017 17:51:30 +0000 (10:51 -0700)
committerspisarski <s.pisarski@cablelabs.com>
Thu, 16 Nov 2017 17:51:30 +0000 (10:51 -0700)
This issue had been found by Stuart Mackie when using OpenContrail.

Change-Id: I5b57773e19c20028ac736521d461d49341c78cb0
Signed-off-by: spisarski <s.pisarski@cablelabs.com>
snaps/openstack/create_security_group.py
snaps/openstack/tests/create_security_group_tests.py

index 5df33f3..fcad584 100644 (file)
@@ -304,9 +304,29 @@ class Protocol(enum.Enum):
     """
     A rule's protocol
     """
-    icmp = 'icmp'
-    tcp = 'tcp'
-    udp = 'udp'
+    ah = 51
+    dccp = 33
+    egp = 8
+    esp = 50
+    gre = 47
+    icmp = 1
+    icmpv6 = 58
+    igmp = 2
+    ipv6_encap = 41
+    ipv6_frag = 44
+    ipv6_icmp = 58
+    ipv6_nonxt = 59
+    ipv6_opts = 60
+    ipv6_route = 43
+    ospf = 89
+    pgm = 113
+    rsvp = 46
+    sctp = 132
+    tcp = 6
+    udp = 17
+    udplite = 136
+    vrrp = 112
+    any = 'any'
     null = 'null'
 
 
@@ -406,8 +426,8 @@ class SecurityGroupRuleSettings:
             out['port_range_max'] = self.port_range_max
         if self.ethertype:
             out['ethertype'] = self.ethertype.name
-        if self.protocol and self.protocol.name != 'null':
-            out['protocol'] = self.protocol.name
+        if self.protocol and self.protocol.value != 'null':
+            out['protocol'] = self.protocol.value
         if self.sec_grp_name:
             sec_grp = neutron_utils.get_security_group(
                 neutron, sec_grp_name=self.sec_grp_name)
@@ -522,18 +542,13 @@ def map_protocol(protocol):
     elif isinstance(protocol, Protocol):
         return protocol
     else:
-        proto_str = str(protocol)
-        if proto_str == 'icmp':
-            return Protocol.icmp
-        elif proto_str == 'tcp':
-            return Protocol.tcp
-        elif proto_str == 'udp':
-            return Protocol.udp
-        elif proto_str == 'null':
-            return Protocol.null
-        else:
-            raise SecurityGroupRuleSettingsError(
-                'Invalid Protocol - ' + proto_str)
+        for proto_enum in Protocol:
+            if proto_enum.name == protocol or proto_enum.value == protocol:
+                if proto_enum == Protocol.any:
+                    return Protocol.null
+                return proto_enum
+        raise SecurityGroupRuleSettingsError(
+            'Invalid Protocol - ' + protocol)
 
 
 def map_ethertype(ethertype):
index 99ea53a..ed62548 100644 (file)
@@ -59,6 +59,38 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
         self.assertEqual('foo', settings.sec_grp_name)
         self.assertEqual(Direction.ingress, settings.direction)
 
+    def test_proto_ah_str(self):
+        settings = SecurityGroupRuleSettings(
+            **{'sec_grp_name': 'foo', 'direction': 'ingress',
+               'protocol': 'ah'})
+        self.assertEqual('foo', settings.sec_grp_name)
+        self.assertEqual(Direction.ingress, settings.direction)
+        self.assertEqual(Protocol.ah, settings.protocol)
+
+    def test_proto_ah_value(self):
+        settings = SecurityGroupRuleSettings(
+            **{'sec_grp_name': 'foo', 'direction': 'ingress',
+               'protocol': 51})
+        self.assertEqual('foo', settings.sec_grp_name)
+        self.assertEqual(Direction.ingress, settings.direction)
+        self.assertEqual(Protocol.ah, settings.protocol)
+
+    def test_proto_any(self):
+        settings = SecurityGroupRuleSettings(
+            **{'sec_grp_name': 'foo', 'direction': 'ingress',
+               'protocol': 'any'})
+        self.assertEqual('foo', settings.sec_grp_name)
+        self.assertEqual(Direction.ingress, settings.direction)
+        self.assertEqual(Protocol.null, settings.protocol)
+
+    def test_proto_null(self):
+        settings = SecurityGroupRuleSettings(
+            **{'sec_grp_name': 'foo', 'direction': 'ingress',
+               'protocol': 'null'})
+        self.assertEqual('foo', settings.sec_grp_name)
+        self.assertEqual(Direction.ingress, settings.direction)
+        self.assertEqual(Protocol.null, settings.protocol)
+
     def test_all(self):
         settings = SecurityGroupRuleSettings(
             sec_grp_name='foo', description='fubar',
@@ -592,11 +624,6 @@ def validate_sec_grp_rules(neutron, rule_settings, rules):
         if rule_setting.description:
             match = False
             for rule in rules:
-                if rule_setting.protocol == Protocol.null:
-                    setting_proto = None
-                else:
-                    setting_proto = rule_setting.protocol.name
-
                 sec_grp = neutron_utils.get_security_group(
                     neutron, sec_grp_name=rule_setting.sec_grp_name)
 
@@ -607,12 +634,16 @@ def validate_sec_grp_rules(neutron, rule_settings, rules):
                 if not sec_grp:
                     return False
 
+                proto_str = 'null'
+                if rule.protocol:
+                    proto_str = rule.protocol
+
                 if (rule.description == rule_setting.description and
                     rule.direction == rule_setting.direction.name and
                     rule.ethertype == setting_eth_type.name and
                     rule.port_range_max == rule_setting.port_range_max and
                     rule.port_range_min == rule_setting.port_range_min and
-                    rule.protocol == setting_proto and
+                    proto_str == str(rule_setting.protocol.value) and
                     rule.remote_group_id == rule_setting.remote_group_id and
                     rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
                     rule.security_group_id == sec_grp.id):