1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction, \
21 from snaps.openstack.tests import validation_utils
22 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
23 from snaps.openstack.utils import neutron_utils
25 __author__ = 'spisarski'
28 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
30 Tests the construction of the SecurityGroupRuleSettings class
33 def test_no_params(self):
34 with self.assertRaises(Exception):
35 SecurityGroupRuleSettings()
37 def test_empty_config(self):
38 with self.assertRaises(Exception):
39 SecurityGroupRuleSettings(config=dict())
41 def test_name_only(self):
42 with self.assertRaises(Exception):
43 SecurityGroupRuleSettings(sec_grp_name='foo')
45 def test_config_with_name_only(self):
46 with self.assertRaises(Exception):
47 SecurityGroupRuleSettings(config={'sec_grp_name': 'foo'})
49 def test_name_and_direction(self):
50 settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress)
51 self.assertEqual('foo', settings.sec_grp_name)
52 self.assertEqual(Direction.ingress, settings.direction)
54 def test_config_name_and_direction(self):
55 settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'})
56 self.assertEqual('foo', settings.sec_grp_name)
57 self.assertEqual(Direction.ingress, settings.direction)
60 settings = SecurityGroupRuleSettings(
61 sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi',
62 protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2,
63 remote_ip_prefix='prfx')
64 self.assertEqual('foo', settings.sec_grp_name)
65 self.assertEqual('fubar', settings.description)
66 self.assertEqual(Direction.egress, settings.direction)
67 self.assertEqual('rgi', settings.remote_group_id)
68 self.assertEqual(Protocol.icmp, settings.protocol)
69 self.assertEqual(Ethertype.IPv6, settings.ethertype)
70 self.assertEqual(1, settings.port_range_min)
71 self.assertEqual(2, settings.port_range_max)
72 self.assertEqual('prfx', settings.remote_ip_prefix)
74 def test_config_all(self):
75 settings = SecurityGroupRuleSettings(
76 config={'sec_grp_name': 'foo',
77 'description': 'fubar',
78 'direction': 'egress',
79 'remote_group_id': 'rgi',
84 'remote_ip_prefix': 'prfx'})
85 self.assertEqual('foo', settings.sec_grp_name)
86 self.assertEqual('fubar', settings.description)
87 self.assertEqual(Direction.egress, settings.direction)
88 self.assertEqual('rgi', settings.remote_group_id)
89 self.assertEqual(Protocol.tcp, settings.protocol)
90 self.assertEqual(Ethertype.IPv6, settings.ethertype)
91 self.assertEqual(1, settings.port_range_min)
92 self.assertEqual(2, settings.port_range_max)
93 self.assertEqual('prfx', settings.remote_ip_prefix)
96 class SecurityGroupSettingsUnitTests(unittest.TestCase):
98 Tests the construction of the SecurityGroupSettings class
101 def test_no_params(self):
102 with self.assertRaises(Exception):
103 SecurityGroupSettings()
105 def test_empty_config(self):
106 with self.assertRaises(Exception):
107 SecurityGroupSettings(config=dict())
109 def test_name_only(self):
110 settings = SecurityGroupSettings(name='foo')
111 self.assertEqual('foo', settings.name)
113 def test_config_with_name_only(self):
114 settings = SecurityGroupSettings(config={'name': 'foo'})
115 self.assertEqual('foo', settings.name)
117 def test_invalid_rule(self):
118 rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress)
119 with self.assertRaises(Exception):
120 SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
123 rule_settings = list()
124 rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.egress))
125 rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress))
126 settings = SecurityGroupSettings(
127 name='bar', description='fubar', project_name='foo', rule_settings=rule_settings)
129 self.assertEqual('bar', settings.name)
130 self.assertEqual('fubar', settings.description)
131 self.assertEqual('foo', settings.project_name)
132 self.assertEqual(rule_settings[0], settings.rule_settings[0])
133 self.assertEqual(rule_settings[1], settings.rule_settings[1])
135 def test_config_all(self):
136 settings = SecurityGroupSettings(
137 config={'name': 'bar',
138 'description': 'fubar',
139 'project_name': 'foo',
140 'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]})
142 self.assertEqual('bar', settings.name)
143 self.assertEqual('fubar', settings.description)
144 self.assertEqual('foo', settings.project_name)
145 self.assertEqual(1, len(settings.rule_settings))
146 self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
147 self.assertEqual(Direction.ingress, settings.rule_settings[0].direction)
150 class CreateSecurityGroupTests(OSIntegrationTestCase):
152 Test for the CreateSecurityGroup class defined in create_security_group.py
157 Instantiates the CreateSecurityGroup object that is responsible for downloading and creating an OS image file
160 super(self.__class__, self).__start__()
162 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
163 self.sec_grp_name = guid + 'name'
164 self.neutron = neutron_utils.neutron_client(self.os_creds)
166 # Initialize for cleanup
167 self.sec_grp_creator = None
171 Cleans the image and downloaded image file
173 if self.sec_grp_creator:
174 self.sec_grp_creator.clean()
176 super(self.__class__, self).__clean__()
178 def test_create_group_without_rules(self):
180 Tests the creation of an OpenStack Security Group without custom rules.
183 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
184 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
185 self.sec_grp_creator.create()
187 sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
188 self.assertIsNotNone(sec_grp)
190 validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
191 rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
192 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
193 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
195 def test_create_delete_group(self):
197 Tests the creation of an OpenStack Security Group without custom rules.
200 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
201 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
202 created_sec_grp = self.sec_grp_creator.create()
203 self.assertIsNotNone(created_sec_grp)
205 neutron_utils.delete_security_group(self.neutron, created_sec_grp)
206 self.assertIsNone(neutron_utils.get_security_group(self.neutron, self.sec_grp_creator.sec_grp_settings.name))
208 self.sec_grp_creator.clean()
210 def test_create_group_with_one_simple_rule(self):
212 Tests the creation of an OpenStack Security Group with one simple custom rule.
215 sec_grp_rule_settings = list()
216 sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
217 direction=Direction.ingress))
218 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
219 rule_settings=sec_grp_rule_settings)
220 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
221 self.sec_grp_creator.create()
223 sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
224 validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
225 rules = neutron_utils.get_rules_by_security_group(self.neutron,
226 self.sec_grp_creator.get_security_group())
227 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
228 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
230 def test_create_group_with_several_rules(self):
232 Tests the creation of an OpenStack Security Group with one simple custom rule.
235 sec_grp_rule_settings = list()
236 sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
237 direction=Direction.ingress))
238 sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
239 direction=Direction.egress,
240 protocol=Protocol.udp,
241 ethertype=Ethertype.IPv6))
242 sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
243 direction=Direction.egress,
244 protocol=Protocol.udp,
245 ethertype=Ethertype.IPv4,
248 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
249 rule_settings=sec_grp_rule_settings)
250 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
251 self.sec_grp_creator.create()
253 sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
254 validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
255 rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
256 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
257 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
259 def test_add_rule(self):
261 Tests the creation of an OpenStack Security Group with one simple custom rule then adds one after creation.
264 sec_grp_rule_settings = list()
265 sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
266 direction=Direction.ingress))
267 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
268 rule_settings=sec_grp_rule_settings)
269 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
270 self.sec_grp_creator.create()
272 sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
273 validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
274 rules = neutron_utils.get_rules_by_security_group(self.neutron,
275 self.sec_grp_creator.get_security_group())
276 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
277 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
279 self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
280 direction=Direction.egress, protocol=Protocol.icmp))
281 rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
282 self.assertEqual(len(rules) + 1, len(rules2))
284 def test_remove_rule_by_id(self):
286 Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule ID.
289 sec_grp_rule_settings = list()
290 sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
291 direction=Direction.ingress))
292 sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
293 direction=Direction.egress,
294 protocol=Protocol.udp,
295 ethertype=Ethertype.IPv6))
296 sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
297 direction=Direction.egress,
298 protocol=Protocol.udp,
299 ethertype=Ethertype.IPv4,
302 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
303 rule_settings=sec_grp_rule_settings)
304 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
305 self.sec_grp_creator.create()
307 sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
308 validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
309 rules = neutron_utils.get_rules_by_security_group(self.neutron,
310 self.sec_grp_creator.get_security_group())
311 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
312 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
314 self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id'])
315 rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
316 self.sec_grp_creator.get_security_group())
317 self.assertEqual(len(rules) - 1, len(rules_after_del))
319 def test_remove_rule_by_setting(self):
321 Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule
325 sec_grp_rule_settings = list()
326 sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
327 direction=Direction.ingress))
328 sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
329 direction=Direction.egress,
330 protocol=Protocol.udp,
331 ethertype=Ethertype.IPv6))
332 sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
333 direction=Direction.egress,
334 protocol=Protocol.udp,
335 ethertype=Ethertype.IPv4,
338 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
339 rule_settings=sec_grp_rule_settings)
340 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
341 self.sec_grp_creator.create()
343 sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
344 validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
345 rules = neutron_utils.get_rules_by_security_group(self.neutron,
346 self.sec_grp_creator.get_security_group())
347 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
348 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
350 self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
351 rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
352 self.sec_grp_creator.get_security_group())
353 self.assertEqual(len(rules) - 1, len(rules_after_del))
355 # TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex