-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import uuid
import unittest
+import uuid
+from snaps.config.security_group import (
+ SecurityGroupConfig, SecurityGroupRuleConfig,
+ SecurityGroupRuleConfigError, SecurityGroupConfigError)
from snaps.openstack import create_security_group
-from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction, \
- Ethertype, Protocol
+from snaps.openstack.create_security_group import (
+ SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
+ Protocol)
from snaps.openstack.tests import validation_utils
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
from snaps.openstack.utils import neutron_utils
"""
def test_no_params(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(SecurityGroupRuleConfigError):
SecurityGroupRuleSettings()
def test_empty_config(self):
- with self.assertRaises(Exception):
- SecurityGroupRuleSettings(config=dict())
+ with self.assertRaises(SecurityGroupRuleConfigError):
+ SecurityGroupRuleSettings(**dict())
def test_name_only(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(SecurityGroupRuleConfigError):
SecurityGroupRuleSettings(sec_grp_name='foo')
def test_config_with_name_only(self):
- with self.assertRaises(Exception):
- SecurityGroupRuleSettings(config={'sec_grp_name': 'foo'})
+ with self.assertRaises(SecurityGroupRuleConfigError):
+ SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
def test_name_and_direction(self):
- settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress)
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals(Direction.ingress, settings.direction)
+ settings = SecurityGroupRuleSettings(sec_grp_name='foo',
+ direction=Direction.ingress)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
def test_config_name_and_direction(self):
- settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'})
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals(Direction.ingress, settings.direction)
+ settings = SecurityGroupRuleSettings(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+
+ def test_proto_ah_str(self):
+ settings = SecurityGroupRuleSettings(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'ah'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.ah.value, settings.protocol.value)
+
+ def test_proto_ah_value(self):
+ settings = SecurityGroupRuleSettings(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 51})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.ah.value, settings.protocol.value)
+
+ def test_proto_any(self):
+ settings = SecurityGroupRuleSettings(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'any'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.null.value, settings.protocol.value)
+
+ def test_proto_null(self):
+ settings = SecurityGroupRuleSettings(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'null'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.null.value, settings.protocol.value)
def test_all(self):
settings = SecurityGroupRuleSettings(
- sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi',
- protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2,
+ sec_grp_name='foo', description='fubar',
+ direction=Direction.egress, remote_group_id='rgi',
+ protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
+ port_range_max=2,
remote_ip_prefix='prfx')
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals(Direction.egress, settings.direction)
- self.assertEquals('rgi', settings.remote_group_id)
- self.assertEquals(Protocol.icmp, settings.protocol)
- self.assertEquals(Ethertype.IPv6, settings.ethertype)
- self.assertEquals(1, settings.port_range_min)
- self.assertEquals(2, settings.port_range_max)
- self.assertEquals('prfx', settings.remote_ip_prefix)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress.value, settings.direction.value)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.icmp.value, settings.protocol.value)
+ self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
def test_config_all(self):
settings = SecurityGroupRuleSettings(
- config={'sec_grp_name': 'foo',
- 'description': 'fubar',
- 'direction': 'egress',
- 'remote_group_id': 'rgi',
- 'protocol': 'tcp',
- 'ethertype': 'IPv6',
- 'port_range_min': 1,
- 'port_range_max': 2,
- 'remote_ip_prefix': 'prfx'})
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals(Direction.egress, settings.direction)
- self.assertEquals('rgi', settings.remote_group_id)
- self.assertEquals(Protocol.tcp, settings.protocol)
- self.assertEquals(Ethertype.IPv6, settings.ethertype)
- self.assertEquals(1, settings.port_range_min)
- self.assertEquals(2, settings.port_range_max)
- self.assertEquals('prfx', settings.remote_ip_prefix)
+ **{'sec_grp_name': 'foo',
+ 'description': 'fubar',
+ 'direction': 'egress',
+ 'remote_group_id': 'rgi',
+ 'protocol': 'tcp',
+ 'ethertype': 'IPv6',
+ 'port_range_min': 1,
+ 'port_range_max': 2,
+ 'remote_ip_prefix': 'prfx'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress.value, settings.direction.value)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.tcp.value, settings.protocol.value)
+ self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
class SecurityGroupSettingsUnitTests(unittest.TestCase):
"""
def test_no_params(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(SecurityGroupConfigError):
SecurityGroupSettings()
def test_empty_config(self):
- with self.assertRaises(Exception):
- SecurityGroupSettings(config=dict())
+ with self.assertRaises(SecurityGroupConfigError):
+ SecurityGroupSettings(**dict())
def test_name_only(self):
settings = SecurityGroupSettings(name='foo')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
def test_config_with_name_only(self):
- settings = SecurityGroupSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
+ settings = SecurityGroupSettings(**{'name': 'foo'})
+ self.assertEqual('foo', settings.name)
def test_invalid_rule(self):
- rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress)
- with self.assertRaises(Exception):
+ rule_setting = SecurityGroupRuleSettings(
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_1')
+ with self.assertRaises(SecurityGroupConfigError):
SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
def test_all(self):
rule_settings = list()
- rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.egress))
- rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress))
+ rule_settings.append(SecurityGroupRuleSettings(
+ sec_grp_name='bar', direction=Direction.egress,
+ description='test_rule_1'))
+ rule_settings.append(SecurityGroupRuleSettings(
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_2'))
settings = SecurityGroupSettings(
- name='bar', description='fubar', project_name='foo', rule_settings=rule_settings)
+ name='bar', description='fubar', project_name='foo',
+ rule_settings=rule_settings)
- self.assertEquals('bar', settings.name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals('foo', settings.project_name)
- self.assertEquals(rule_settings[0], settings.rule_settings[0])
- self.assertEquals(rule_settings[1], settings.rule_settings[1])
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(rule_settings[0], settings.rule_settings[0])
+ self.assertEqual(rule_settings[1], settings.rule_settings[1])
def test_config_all(self):
settings = SecurityGroupSettings(
- config={'name': 'bar',
- 'description': 'fubar',
- 'project_name': 'foo',
- 'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]})
+ **{'name': 'bar',
+ 'description': 'fubar',
+ 'project_name': 'foo',
+ 'rules': [
+ {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
- self.assertEquals('bar', settings.name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals('foo', settings.project_name)
- self.assertEquals(1, len(settings.rule_settings))
- self.assertEquals('bar', settings.rule_settings[0].sec_grp_name)
- self.assertEquals(Direction.ingress, settings.rule_settings[0].direction)
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(1, len(settings.rule_settings))
+ self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
+ self.assertEqual(Direction.ingress.value,
+ settings.rule_settings[0].direction.value)
class CreateSecurityGroupTests(OSIntegrationTestCase):
def setUp(self):
"""
- Instantiates the CreateSecurityGroup object that is responsible for downloading and creating an OS image file
- within OpenStack
+ Instantiates the CreateSecurityGroup object that is responsible for
+ downloading and creating an OS image file within OpenStack
"""
super(self.__class__, self).__start__()
Tests the creation of an OpenStack Security Group without custom rules.
"""
# Create Image
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
+ description='hello group')
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
+ self.assertIsNotNone(sec_grp)
+
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group()))
+
+ def test_create_group_admin_user_to_new_project(self):
+ """
+ Tests the creation of an OpenStack Security Group without custom rules.
+ """
+ # Create Image
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group',
+ project_name=self.admin_os_creds.project_name)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
+ def test_create_group_new_user_to_admin_project(self):
+ """
+ Tests the creation of an OpenStack Security Group without custom rules.
+ """
+ # Create Image
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group',
+ project_name=self.os_creds.project_name)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.admin_os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
+ self.assertIsNotNone(sec_grp)
+
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
def test_create_delete_group(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
# Create Image
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
+ description='hello group')
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
created_sec_grp = self.sec_grp_creator.create()
self.assertIsNotNone(created_sec_grp)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group()))
+
neutron_utils.delete_security_group(self.neutron, created_sec_grp)
- self.assertIsNone(neutron_utils.get_security_group(self.neutron, self.sec_grp_creator.sec_grp_settings.name))
+ self.assertIsNone(neutron_utils.get_security_group(
+ self.neutron,
+ sec_grp_settings=self.sec_grp_creator.sec_grp_settings))
self.sec_grp_creator.clean()
def test_create_group_with_one_simple_rule(self):
"""
- Tests the creation of an OpenStack Security Group with one simple custom rule.
+ Tests the creation of an OpenStack Security Group with one simple
+ custom rule.
+ """
+ # Create Image
+ sec_grp_rule_settings = list()
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
+ def test_create_group_with_one_complex_rule(self):
+ """
+ Tests the creation of an OpenStack Security Group with one simple
+ custom rule.
"""
# Create Image
sec_grp_rule_settings = list()
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=sec_grp_rule_settings)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_1'))
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
def test_create_group_with_several_rules(self):
"""
- Tests the creation of an OpenStack Security Group with one simple custom rule.
+ Tests the creation of an OpenStack Security Group with one simple
+ custom rule.
"""
# Create Image
sec_grp_rule_settings = list()
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=sec_grp_rule_settings)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
def test_add_rule(self):
"""
- Tests the creation of an OpenStack Security Group with one simple custom rule then adds one after creation.
+ Tests the creation of an OpenStack Security Group with one simple
+ custom rule then adds one after creation.
"""
# Create Image
sec_grp_rule_settings = list()
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=sec_grp_rule_settings)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
-
- self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
- direction=Direction.egress, protocol=Protocol.icmp))
- rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) + 1, len(rules2))
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.sec_grp_creator.add_rule(SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
+ direction=Direction.egress, protocol=Protocol.icmp,
+ description='test_rule_2'))
+ rules2 = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(rules) + 1, len(rules2))
def test_remove_rule_by_id(self):
"""
- Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule ID.
+ Tests the creation of an OpenStack Security Group with two simple
+ custom rules then removes one by the rule ID.
"""
# Create Image
sec_grp_rule_settings = list()
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=sec_grp_rule_settings)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
-
- self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id'])
- rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) - 1, len(rules_after_del))
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
+ self.sec_grp_creator.remove_rule(
+ rule_id=rules[0].id)
+ rules_after_del = neutron_utils.get_rules_by_security_group(
+ self.neutron,
+ self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(rules) - 1, len(rules_after_del))
def test_remove_rule_by_setting(self):
"""
- Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule
- setting object
+ Tests the creation of an OpenStack Security Group with two simple
+ custom rules then removes one by the rule setting object
"""
# Create Image
sec_grp_rule_settings = list()
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=sec_grp_rule_settings)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
- rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) - 1, len(rules_after_del))
+ rules_after_del = neutron_utils.get_rules_by_security_group(
+ self.neutron,
+ self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(rules) - 1, len(rules_after_del))
+
+
+def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
+ """
+ Returns True is the settings on a security group are properly contained
+ on the SNAPS SecurityGroup domain object
+ :param neutron: the neutron client
+ :param sec_grp_settings: the security group configuration
+ :param sec_grp: the SNAPS-OO security group object
+ :param rules: collection of SNAPS-OO security group rule objects
+ :return: T/F
+ """
+ return (sec_grp.description == sec_grp_settings.description and
+ sec_grp.name == sec_grp_settings.name and
+ validate_sec_grp_rules(
+ neutron, sec_grp_settings.rule_settings, rules))
+
+
+def validate_sec_grp_rules(neutron, rule_settings, rules):
+ """
+ Returns True is the settings on a security group rule are properly
+ contained on the SNAPS SecurityGroupRule domain object.
+ This function will only operate on rules that contain a description as
+ this is the only means to tell if the rule is custom or defaulted by
+ OpenStack
+ :param neutron: the neutron client
+ :param rule_settings: collection of SecurityGroupRuleConfig objects
+ :param rules: a collection of SecurityGroupRule domain objects
+ :return: T/F
+ """
-# TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex
+ for rule_setting in rule_settings:
+ if rule_setting.description:
+ match = False
+ for rule in rules:
+ sec_grp = neutron_utils.get_security_group(
+ neutron, sec_grp_name=rule_setting.sec_grp_name)
+
+ setting_eth_type = create_security_group.Ethertype.IPv4
+ if rule_setting.ethertype:
+ setting_eth_type = rule_setting.ethertype
+
+ if not sec_grp:
+ return False
+
+ proto_str = 'null'
+ if rule.protocol:
+ proto_str = rule.protocol
+
+ if (rule.description == rule_setting.description and
+ rule.direction == rule_setting.direction.name and
+ rule.ethertype == setting_eth_type.name and
+ rule.port_range_max == rule_setting.port_range_max and
+ rule.port_range_min == rule_setting.port_range_min and
+ proto_str == str(rule_setting.protocol.value) and
+ rule.remote_group_id == rule_setting.remote_group_id and
+ rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
+ rule.security_group_id == sec_grp.id):
+ match = True
+ break
+
+ if not match:
+ return False
+
+ return True