1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import (
20 SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
21 Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError)
22 from snaps.openstack.tests import validation_utils
23 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
24 from snaps.openstack.utils import neutron_utils
26 __author__ = 'spisarski'
29 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
31 Tests the construction of the SecurityGroupRuleSettings class
34 def test_no_params(self):
35 with self.assertRaises(SecurityGroupRuleSettingsError):
36 SecurityGroupRuleSettings()
38 def test_empty_config(self):
39 with self.assertRaises(SecurityGroupRuleSettingsError):
40 SecurityGroupRuleSettings(**dict())
42 def test_name_only(self):
43 with self.assertRaises(SecurityGroupRuleSettingsError):
44 SecurityGroupRuleSettings(sec_grp_name='foo')
46 def test_config_with_name_only(self):
47 with self.assertRaises(SecurityGroupRuleSettingsError):
48 SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
50 def test_name_and_direction(self):
51 settings = SecurityGroupRuleSettings(sec_grp_name='foo',
52 direction=Direction.ingress)
53 self.assertEqual('foo', settings.sec_grp_name)
54 self.assertEqual(Direction.ingress, settings.direction)
56 def test_config_name_and_direction(self):
57 settings = SecurityGroupRuleSettings(
58 **{'sec_grp_name': 'foo', 'direction': 'ingress'})
59 self.assertEqual('foo', settings.sec_grp_name)
60 self.assertEqual(Direction.ingress, settings.direction)
63 settings = SecurityGroupRuleSettings(
64 sec_grp_name='foo', description='fubar',
65 direction=Direction.egress, remote_group_id='rgi',
66 protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
68 remote_ip_prefix='prfx')
69 self.assertEqual('foo', settings.sec_grp_name)
70 self.assertEqual('fubar', settings.description)
71 self.assertEqual(Direction.egress, settings.direction)
72 self.assertEqual('rgi', settings.remote_group_id)
73 self.assertEqual(Protocol.icmp, settings.protocol)
74 self.assertEqual(Ethertype.IPv6, settings.ethertype)
75 self.assertEqual(1, settings.port_range_min)
76 self.assertEqual(2, settings.port_range_max)
77 self.assertEqual('prfx', settings.remote_ip_prefix)
79 def test_config_all(self):
80 settings = SecurityGroupRuleSettings(
81 **{'sec_grp_name': 'foo',
82 'description': 'fubar',
83 'direction': 'egress',
84 'remote_group_id': 'rgi',
89 'remote_ip_prefix': 'prfx'})
90 self.assertEqual('foo', settings.sec_grp_name)
91 self.assertEqual('fubar', settings.description)
92 self.assertEqual(Direction.egress, settings.direction)
93 self.assertEqual('rgi', settings.remote_group_id)
94 self.assertEqual(Protocol.tcp, settings.protocol)
95 self.assertEqual(Ethertype.IPv6, settings.ethertype)
96 self.assertEqual(1, settings.port_range_min)
97 self.assertEqual(2, settings.port_range_max)
98 self.assertEqual('prfx', settings.remote_ip_prefix)
101 class SecurityGroupSettingsUnitTests(unittest.TestCase):
103 Tests the construction of the SecurityGroupSettings class
106 def test_no_params(self):
107 with self.assertRaises(SecurityGroupSettingsError):
108 SecurityGroupSettings()
110 def test_empty_config(self):
111 with self.assertRaises(SecurityGroupSettingsError):
112 SecurityGroupSettings(**dict())
114 def test_name_only(self):
115 settings = SecurityGroupSettings(name='foo')
116 self.assertEqual('foo', settings.name)
118 def test_config_with_name_only(self):
119 settings = SecurityGroupSettings(**{'name': 'foo'})
120 self.assertEqual('foo', settings.name)
122 def test_invalid_rule(self):
123 rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
124 direction=Direction.ingress)
125 with self.assertRaises(SecurityGroupSettingsError):
126 SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
129 rule_settings = list()
130 rule_settings.append(SecurityGroupRuleSettings(
131 sec_grp_name='bar', direction=Direction.egress))
132 rule_settings.append(SecurityGroupRuleSettings(
133 sec_grp_name='bar', direction=Direction.ingress))
134 settings = SecurityGroupSettings(
135 name='bar', description='fubar', project_name='foo',
136 rule_settings=rule_settings)
138 self.assertEqual('bar', settings.name)
139 self.assertEqual('fubar', settings.description)
140 self.assertEqual('foo', settings.project_name)
141 self.assertEqual(rule_settings[0], settings.rule_settings[0])
142 self.assertEqual(rule_settings[1], settings.rule_settings[1])
144 def test_config_all(self):
145 settings = SecurityGroupSettings(
147 'description': 'fubar',
148 'project_name': 'foo',
150 {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
152 self.assertEqual('bar', settings.name)
153 self.assertEqual('fubar', settings.description)
154 self.assertEqual('foo', settings.project_name)
155 self.assertEqual(1, len(settings.rule_settings))
156 self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
157 self.assertEqual(Direction.ingress,
158 settings.rule_settings[0].direction)
161 class CreateSecurityGroupTests(OSIntegrationTestCase):
163 Test for the CreateSecurityGroup class defined in create_security_group.py
168 Instantiates the CreateSecurityGroup object that is responsible for
169 downloading and creating an OS image file within OpenStack
171 super(self.__class__, self).__start__()
173 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
174 self.sec_grp_name = guid + 'name'
175 self.neutron = neutron_utils.neutron_client(self.os_creds)
177 # Initialize for cleanup
178 self.sec_grp_creator = None
182 Cleans the image and downloaded image file
184 if self.sec_grp_creator:
185 self.sec_grp_creator.clean()
187 super(self.__class__, self).__clean__()
189 def test_create_group_without_rules(self):
191 Tests the creation of an OpenStack Security Group without custom rules.
194 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
195 description='hello group')
196 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
197 self.os_creds, sec_grp_settings)
198 self.sec_grp_creator.create()
200 sec_grp = neutron_utils.get_security_group(self.neutron,
202 self.assertIsNotNone(sec_grp)
204 validation_utils.objects_equivalent(
205 self.sec_grp_creator.get_security_group(), sec_grp)
206 rules = neutron_utils.get_rules_by_security_group(
207 self.neutron, self.sec_grp_creator.get_security_group())
208 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
209 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
212 def test_create_group_admin_user_to_new_project(self):
214 Tests the creation of an OpenStack Security Group without custom rules.
217 sec_grp_settings = SecurityGroupSettings(
218 name=self.sec_grp_name, description='hello group',
219 project_name=self.admin_os_creds.project_name)
220 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
221 self.os_creds, sec_grp_settings)
222 self.sec_grp_creator.create()
224 sec_grp = neutron_utils.get_security_group(self.neutron,
226 self.assertIsNotNone(sec_grp)
228 validation_utils.objects_equivalent(
229 self.sec_grp_creator.get_security_group(), sec_grp)
230 rules = neutron_utils.get_rules_by_security_group(
231 self.neutron, self.sec_grp_creator.get_security_group())
232 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
233 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
236 def test_create_group_new_user_to_admin_project(self):
238 Tests the creation of an OpenStack Security Group without custom rules.
241 sec_grp_settings = SecurityGroupSettings(
242 name=self.sec_grp_name, description='hello group',
243 project_name=self.os_creds.project_name)
244 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
245 self.admin_os_creds, sec_grp_settings)
246 self.sec_grp_creator.create()
248 sec_grp = neutron_utils.get_security_group(self.neutron,
250 self.assertIsNotNone(sec_grp)
252 validation_utils.objects_equivalent(
253 self.sec_grp_creator.get_security_group(), sec_grp)
254 rules = neutron_utils.get_rules_by_security_group(
255 self.neutron, self.sec_grp_creator.get_security_group())
256 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
257 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
260 def test_create_delete_group(self):
262 Tests the creation of an OpenStack Security Group without custom rules.
265 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
266 description='hello group')
267 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
268 self.os_creds, sec_grp_settings)
269 created_sec_grp = self.sec_grp_creator.create()
270 self.assertIsNotNone(created_sec_grp)
272 neutron_utils.delete_security_group(self.neutron, created_sec_grp)
273 self.assertIsNone(neutron_utils.get_security_group(
274 self.neutron, self.sec_grp_creator.sec_grp_settings.name))
276 self.sec_grp_creator.clean()
278 def test_create_group_with_one_simple_rule(self):
280 Tests the creation of an OpenStack Security Group with one simple
284 sec_grp_rule_settings = list()
285 sec_grp_rule_settings.append(
286 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
287 direction=Direction.ingress))
288 sec_grp_settings = SecurityGroupSettings(
289 name=self.sec_grp_name, description='hello group',
290 rule_settings=sec_grp_rule_settings)
291 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
292 self.os_creds, sec_grp_settings)
293 self.sec_grp_creator.create()
295 sec_grp = neutron_utils.get_security_group(self.neutron,
297 validation_utils.objects_equivalent(
298 self.sec_grp_creator.get_security_group(), sec_grp)
299 rules = neutron_utils.get_rules_by_security_group(
300 self.neutron, self.sec_grp_creator.get_security_group())
301 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
302 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
305 def test_create_group_with_several_rules(self):
307 Tests the creation of an OpenStack Security Group with one simple
311 sec_grp_rule_settings = list()
312 sec_grp_rule_settings.append(
313 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
314 direction=Direction.ingress))
315 sec_grp_rule_settings.append(
316 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
317 direction=Direction.egress,
318 protocol=Protocol.udp,
319 ethertype=Ethertype.IPv6))
320 sec_grp_rule_settings.append(
321 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
322 direction=Direction.egress,
323 protocol=Protocol.udp,
324 ethertype=Ethertype.IPv4,
327 sec_grp_settings = SecurityGroupSettings(
328 name=self.sec_grp_name, description='hello group',
329 rule_settings=sec_grp_rule_settings)
330 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
331 self.os_creds, sec_grp_settings)
332 self.sec_grp_creator.create()
334 sec_grp = neutron_utils.get_security_group(self.neutron,
336 validation_utils.objects_equivalent(
337 self.sec_grp_creator.get_security_group(), sec_grp)
338 rules = neutron_utils.get_rules_by_security_group(
339 self.neutron, self.sec_grp_creator.get_security_group())
340 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
341 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
344 def test_add_rule(self):
346 Tests the creation of an OpenStack Security Group with one simple
347 custom rule then adds one after creation.
350 sec_grp_rule_settings = list()
351 sec_grp_rule_settings.append(
352 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
353 direction=Direction.ingress))
354 sec_grp_settings = SecurityGroupSettings(
355 name=self.sec_grp_name, description='hello group',
356 rule_settings=sec_grp_rule_settings)
357 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
358 self.os_creds, sec_grp_settings)
359 self.sec_grp_creator.create()
361 sec_grp = neutron_utils.get_security_group(self.neutron,
363 validation_utils.objects_equivalent(
364 self.sec_grp_creator.get_security_group(), sec_grp)
365 rules = neutron_utils.get_rules_by_security_group(
366 self.neutron, self.sec_grp_creator.get_security_group())
367 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
368 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
371 self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
372 sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
373 direction=Direction.egress, protocol=Protocol.icmp))
374 rules2 = neutron_utils.get_rules_by_security_group(
375 self.neutron, self.sec_grp_creator.get_security_group())
376 self.assertEqual(len(rules) + 1, len(rules2))
378 def test_remove_rule_by_id(self):
380 Tests the creation of an OpenStack Security Group with two simple
381 custom rules then removes one by the rule ID.
384 sec_grp_rule_settings = list()
385 sec_grp_rule_settings.append(
386 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
387 direction=Direction.ingress))
388 sec_grp_rule_settings.append(
389 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
390 direction=Direction.egress,
391 protocol=Protocol.udp,
392 ethertype=Ethertype.IPv6))
393 sec_grp_rule_settings.append(
394 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
395 direction=Direction.egress,
396 protocol=Protocol.udp,
397 ethertype=Ethertype.IPv4,
400 sec_grp_settings = SecurityGroupSettings(
401 name=self.sec_grp_name, description='hello group',
402 rule_settings=sec_grp_rule_settings)
403 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
404 self.os_creds, sec_grp_settings)
405 self.sec_grp_creator.create()
407 sec_grp = neutron_utils.get_security_group(self.neutron,
409 validation_utils.objects_equivalent(
410 self.sec_grp_creator.get_security_group(), sec_grp)
411 rules = neutron_utils.get_rules_by_security_group(
412 self.neutron, self.sec_grp_creator.get_security_group())
413 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
414 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
417 self.sec_grp_creator.remove_rule(
419 rules_after_del = neutron_utils.get_rules_by_security_group(
421 self.sec_grp_creator.get_security_group())
422 self.assertEqual(len(rules) - 1, len(rules_after_del))
424 def test_remove_rule_by_setting(self):
426 Tests the creation of an OpenStack Security Group with two simple
427 custom rules then removes one by the rule setting object
430 sec_grp_rule_settings = list()
431 sec_grp_rule_settings.append(
432 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
433 direction=Direction.ingress))
434 sec_grp_rule_settings.append(
435 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
436 direction=Direction.egress,
437 protocol=Protocol.udp,
438 ethertype=Ethertype.IPv6))
439 sec_grp_rule_settings.append(
440 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
441 direction=Direction.egress,
442 protocol=Protocol.udp,
443 ethertype=Ethertype.IPv4,
446 sec_grp_settings = SecurityGroupSettings(
447 name=self.sec_grp_name, description='hello group',
448 rule_settings=sec_grp_rule_settings)
449 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
450 self.os_creds, sec_grp_settings)
451 self.sec_grp_creator.create()
453 sec_grp = neutron_utils.get_security_group(self.neutron,
455 validation_utils.objects_equivalent(
456 self.sec_grp_creator.get_security_group(), sec_grp)
457 rules = neutron_utils.get_rules_by_security_group(
458 self.neutron, self.sec_grp_creator.get_security_group())
459 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
460 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
463 self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
464 rules_after_del = neutron_utils.get_rules_by_security_group(
466 self.sec_grp_creator.get_security_group())
467 self.assertEqual(len(rules) - 1, len(rules_after_del))
469 # TODO - Add more tests with different rules. Rule creation parameters can be