1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import (
20 SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
21 Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError)
22 from snaps.openstack.tests import validation_utils
23 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
24 from snaps.openstack.utils import neutron_utils
26 __author__ = 'spisarski'
29 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
31 Tests the construction of the SecurityGroupRuleSettings class
34 def test_no_params(self):
35 with self.assertRaises(SecurityGroupRuleSettingsError):
36 SecurityGroupRuleSettings()
38 def test_empty_config(self):
39 with self.assertRaises(SecurityGroupRuleSettingsError):
40 SecurityGroupRuleSettings(**dict())
42 def test_name_only(self):
43 with self.assertRaises(SecurityGroupRuleSettingsError):
44 SecurityGroupRuleSettings(sec_grp_name='foo')
46 def test_config_with_name_only(self):
47 with self.assertRaises(SecurityGroupRuleSettingsError):
48 SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
50 def test_name_and_direction(self):
51 settings = SecurityGroupRuleSettings(sec_grp_name='foo',
52 direction=Direction.ingress)
53 self.assertEqual('foo', settings.sec_grp_name)
54 self.assertEqual(Direction.ingress, settings.direction)
56 def test_config_name_and_direction(self):
57 settings = SecurityGroupRuleSettings(
58 **{'sec_grp_name': 'foo', 'direction': 'ingress'})
59 self.assertEqual('foo', settings.sec_grp_name)
60 self.assertEqual(Direction.ingress, settings.direction)
63 settings = SecurityGroupRuleSettings(
64 sec_grp_name='foo', description='fubar',
65 direction=Direction.egress, remote_group_id='rgi',
66 protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
68 remote_ip_prefix='prfx')
69 self.assertEqual('foo', settings.sec_grp_name)
70 self.assertEqual('fubar', settings.description)
71 self.assertEqual(Direction.egress, settings.direction)
72 self.assertEqual('rgi', settings.remote_group_id)
73 self.assertEqual(Protocol.icmp, settings.protocol)
74 self.assertEqual(Ethertype.IPv6, settings.ethertype)
75 self.assertEqual(1, settings.port_range_min)
76 self.assertEqual(2, settings.port_range_max)
77 self.assertEqual('prfx', settings.remote_ip_prefix)
79 def test_config_all(self):
80 settings = SecurityGroupRuleSettings(
81 **{'sec_grp_name': 'foo',
82 'description': 'fubar',
83 'direction': 'egress',
84 'remote_group_id': 'rgi',
89 'remote_ip_prefix': 'prfx'})
90 self.assertEqual('foo', settings.sec_grp_name)
91 self.assertEqual('fubar', settings.description)
92 self.assertEqual(Direction.egress, settings.direction)
93 self.assertEqual('rgi', settings.remote_group_id)
94 self.assertEqual(Protocol.tcp, settings.protocol)
95 self.assertEqual(Ethertype.IPv6, settings.ethertype)
96 self.assertEqual(1, settings.port_range_min)
97 self.assertEqual(2, settings.port_range_max)
98 self.assertEqual('prfx', settings.remote_ip_prefix)
101 class SecurityGroupSettingsUnitTests(unittest.TestCase):
103 Tests the construction of the SecurityGroupSettings class
106 def test_no_params(self):
107 with self.assertRaises(SecurityGroupSettingsError):
108 SecurityGroupSettings()
110 def test_empty_config(self):
111 with self.assertRaises(SecurityGroupSettingsError):
112 SecurityGroupSettings(**dict())
114 def test_name_only(self):
115 settings = SecurityGroupSettings(name='foo')
116 self.assertEqual('foo', settings.name)
118 def test_config_with_name_only(self):
119 settings = SecurityGroupSettings(**{'name': 'foo'})
120 self.assertEqual('foo', settings.name)
122 def test_invalid_rule(self):
123 rule_setting = SecurityGroupRuleSettings(
124 sec_grp_name='bar', direction=Direction.ingress,
125 description='test_rule_1')
126 with self.assertRaises(SecurityGroupSettingsError):
127 SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
130 rule_settings = list()
131 rule_settings.append(SecurityGroupRuleSettings(
132 sec_grp_name='bar', direction=Direction.egress,
133 description='test_rule_1'))
134 rule_settings.append(SecurityGroupRuleSettings(
135 sec_grp_name='bar', direction=Direction.ingress,
136 description='test_rule_2'))
137 settings = SecurityGroupSettings(
138 name='bar', description='fubar', project_name='foo',
139 rule_settings=rule_settings)
141 self.assertEqual('bar', settings.name)
142 self.assertEqual('fubar', settings.description)
143 self.assertEqual('foo', settings.project_name)
144 self.assertEqual(rule_settings[0], settings.rule_settings[0])
145 self.assertEqual(rule_settings[1], settings.rule_settings[1])
147 def test_config_all(self):
148 settings = SecurityGroupSettings(
150 'description': 'fubar',
151 'project_name': 'foo',
153 {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
155 self.assertEqual('bar', settings.name)
156 self.assertEqual('fubar', settings.description)
157 self.assertEqual('foo', settings.project_name)
158 self.assertEqual(1, len(settings.rule_settings))
159 self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
160 self.assertEqual(Direction.ingress,
161 settings.rule_settings[0].direction)
164 class CreateSecurityGroupTests(OSIntegrationTestCase):
166 Test for the CreateSecurityGroup class defined in create_security_group.py
171 Instantiates the CreateSecurityGroup object that is responsible for
172 downloading and creating an OS image file within OpenStack
174 super(self.__class__, self).__start__()
176 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
177 self.sec_grp_name = guid + 'name'
178 self.neutron = neutron_utils.neutron_client(self.os_creds)
180 # Initialize for cleanup
181 self.sec_grp_creator = None
185 Cleans the image and downloaded image file
187 if self.sec_grp_creator:
188 self.sec_grp_creator.clean()
190 super(self.__class__, self).__clean__()
192 def test_create_group_without_rules(self):
194 Tests the creation of an OpenStack Security Group without custom rules.
197 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
198 description='hello group')
199 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
200 self.os_creds, sec_grp_settings)
201 self.sec_grp_creator.create()
203 sec_grp = neutron_utils.get_security_group(self.neutron,
205 self.assertIsNotNone(sec_grp)
207 validation_utils.objects_equivalent(
208 self.sec_grp_creator.get_security_group(), sec_grp)
209 rules = neutron_utils.get_rules_by_security_group(
210 self.neutron, self.sec_grp_creator.get_security_group())
211 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
212 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
217 self.neutron, self.sec_grp_creator.sec_grp_settings,
218 self.sec_grp_creator.get_security_group()))
220 def test_create_group_admin_user_to_new_project(self):
222 Tests the creation of an OpenStack Security Group without custom rules.
225 sec_grp_settings = SecurityGroupSettings(
226 name=self.sec_grp_name, description='hello group',
227 project_name=self.admin_os_creds.project_name)
228 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
229 self.os_creds, sec_grp_settings)
230 self.sec_grp_creator.create()
232 sec_grp = neutron_utils.get_security_group(self.neutron,
234 self.assertIsNotNone(sec_grp)
236 validation_utils.objects_equivalent(
237 self.sec_grp_creator.get_security_group(), sec_grp)
238 rules = neutron_utils.get_rules_by_security_group(
239 self.neutron, self.sec_grp_creator.get_security_group())
240 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
241 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
246 self.neutron, self.sec_grp_creator.sec_grp_settings,
247 self.sec_grp_creator.get_security_group(), rules))
249 def test_create_group_new_user_to_admin_project(self):
251 Tests the creation of an OpenStack Security Group without custom rules.
254 sec_grp_settings = SecurityGroupSettings(
255 name=self.sec_grp_name, description='hello group',
256 project_name=self.os_creds.project_name)
257 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
258 self.admin_os_creds, sec_grp_settings)
259 self.sec_grp_creator.create()
261 sec_grp = neutron_utils.get_security_group(self.neutron,
263 self.assertIsNotNone(sec_grp)
265 validation_utils.objects_equivalent(
266 self.sec_grp_creator.get_security_group(), sec_grp)
267 rules = neutron_utils.get_rules_by_security_group(
268 self.neutron, self.sec_grp_creator.get_security_group())
269 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
270 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
275 self.neutron, self.sec_grp_creator.sec_grp_settings,
276 self.sec_grp_creator.get_security_group(), rules))
278 def test_create_delete_group(self):
280 Tests the creation of an OpenStack Security Group without custom rules.
283 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
284 description='hello group')
285 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
286 self.os_creds, sec_grp_settings)
287 created_sec_grp = self.sec_grp_creator.create()
288 self.assertIsNotNone(created_sec_grp)
292 self.neutron, self.sec_grp_creator.sec_grp_settings,
293 self.sec_grp_creator.get_security_group()))
295 neutron_utils.delete_security_group(self.neutron, created_sec_grp)
296 self.assertIsNone(neutron_utils.get_security_group(
297 self.neutron, self.sec_grp_creator.sec_grp_settings.name))
299 self.sec_grp_creator.clean()
301 def test_create_group_with_one_simple_rule(self):
303 Tests the creation of an OpenStack Security Group with one simple
307 sec_grp_rule_settings = list()
308 sec_grp_rule_settings.append(
309 SecurityGroupRuleSettings(
310 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
311 description='test_rule_1'))
312 sec_grp_settings = SecurityGroupSettings(
313 name=self.sec_grp_name, description='hello group',
314 rule_settings=sec_grp_rule_settings)
315 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
316 self.os_creds, sec_grp_settings)
317 self.sec_grp_creator.create()
319 sec_grp = neutron_utils.get_security_group(self.neutron,
321 validation_utils.objects_equivalent(
322 self.sec_grp_creator.get_security_group(), sec_grp)
323 rules = neutron_utils.get_rules_by_security_group(
324 self.neutron, self.sec_grp_creator.get_security_group())
325 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
326 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
331 self.neutron, self.sec_grp_creator.sec_grp_settings,
332 self.sec_grp_creator.get_security_group(), rules))
334 def test_create_group_with_one_complex_rule(self):
336 Tests the creation of an OpenStack Security Group with one simple
340 sec_grp_rule_settings = list()
341 sec_grp_rule_settings.append(
342 SecurityGroupRuleSettings(
343 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
344 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
345 port_range_min=10, port_range_max=20,
346 description='test_rule_1'))
347 sec_grp_settings = SecurityGroupSettings(
348 name=self.sec_grp_name, description='hello group',
349 rule_settings=sec_grp_rule_settings)
350 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
351 self.os_creds, sec_grp_settings)
352 self.sec_grp_creator.create()
354 sec_grp = neutron_utils.get_security_group(self.neutron,
356 validation_utils.objects_equivalent(
357 self.sec_grp_creator.get_security_group(), sec_grp)
358 rules = neutron_utils.get_rules_by_security_group(
359 self.neutron, self.sec_grp_creator.get_security_group())
360 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
361 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
366 self.neutron, self.sec_grp_creator.sec_grp_settings,
367 self.sec_grp_creator.get_security_group(), rules))
369 def test_create_group_with_several_rules(self):
371 Tests the creation of an OpenStack Security Group with one simple
375 sec_grp_rule_settings = list()
376 sec_grp_rule_settings.append(
377 SecurityGroupRuleSettings(
378 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
379 description='test_rule_1'))
380 sec_grp_rule_settings.append(
381 SecurityGroupRuleSettings(
382 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
383 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
384 description='test_rule_2'))
385 sec_grp_rule_settings.append(
386 SecurityGroupRuleSettings(
387 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
388 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
389 port_range_min=10, port_range_max=20,
390 description='test_rule_3'))
391 sec_grp_settings = SecurityGroupSettings(
392 name=self.sec_grp_name, description='hello group',
393 rule_settings=sec_grp_rule_settings)
394 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
395 self.os_creds, sec_grp_settings)
396 self.sec_grp_creator.create()
398 sec_grp = neutron_utils.get_security_group(self.neutron,
400 validation_utils.objects_equivalent(
401 self.sec_grp_creator.get_security_group(), sec_grp)
402 rules = neutron_utils.get_rules_by_security_group(
403 self.neutron, self.sec_grp_creator.get_security_group())
404 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
405 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
410 self.neutron, self.sec_grp_creator.sec_grp_settings,
411 self.sec_grp_creator.get_security_group(), rules))
413 def test_add_rule(self):
415 Tests the creation of an OpenStack Security Group with one simple
416 custom rule then adds one after creation.
419 sec_grp_rule_settings = list()
420 sec_grp_rule_settings.append(
421 SecurityGroupRuleSettings(
422 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
423 description='test_rule_1'))
424 sec_grp_settings = SecurityGroupSettings(
425 name=self.sec_grp_name, description='hello group',
426 rule_settings=sec_grp_rule_settings)
427 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
428 self.os_creds, sec_grp_settings)
429 self.sec_grp_creator.create()
431 sec_grp = neutron_utils.get_security_group(self.neutron,
433 validation_utils.objects_equivalent(
434 self.sec_grp_creator.get_security_group(), sec_grp)
436 rules = neutron_utils.get_rules_by_security_group(
437 self.neutron, self.sec_grp_creator.get_security_group())
441 self.neutron, self.sec_grp_creator.sec_grp_settings,
442 self.sec_grp_creator.get_security_group(), rules))
444 rules = neutron_utils.get_rules_by_security_group(
445 self.neutron, self.sec_grp_creator.get_security_group())
446 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
447 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
450 self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
451 sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
452 direction=Direction.egress, protocol=Protocol.icmp,
453 description='test_rule_2'))
454 rules2 = neutron_utils.get_rules_by_security_group(
455 self.neutron, self.sec_grp_creator.get_security_group())
456 self.assertEqual(len(rules) + 1, len(rules2))
458 def test_remove_rule_by_id(self):
460 Tests the creation of an OpenStack Security Group with two simple
461 custom rules then removes one by the rule ID.
464 sec_grp_rule_settings = list()
465 sec_grp_rule_settings.append(
466 SecurityGroupRuleSettings(
467 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
468 description='test_rule_1'))
469 sec_grp_rule_settings.append(
470 SecurityGroupRuleSettings(
471 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
472 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
473 description='test_rule_2'))
474 sec_grp_rule_settings.append(
475 SecurityGroupRuleSettings(
476 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
477 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
478 port_range_min=10, port_range_max=20,
479 description='test_rule_3'))
480 sec_grp_settings = SecurityGroupSettings(
481 name=self.sec_grp_name, description='hello group',
482 rule_settings=sec_grp_rule_settings)
483 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
484 self.os_creds, sec_grp_settings)
485 self.sec_grp_creator.create()
487 sec_grp = neutron_utils.get_security_group(self.neutron,
489 validation_utils.objects_equivalent(
490 self.sec_grp_creator.get_security_group(), sec_grp)
491 rules = neutron_utils.get_rules_by_security_group(
492 self.neutron, self.sec_grp_creator.get_security_group())
493 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
494 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
499 self.neutron, self.sec_grp_creator.sec_grp_settings,
500 self.sec_grp_creator.get_security_group(), rules))
502 self.sec_grp_creator.remove_rule(
504 rules_after_del = neutron_utils.get_rules_by_security_group(
506 self.sec_grp_creator.get_security_group())
507 self.assertEqual(len(rules) - 1, len(rules_after_del))
509 def test_remove_rule_by_setting(self):
511 Tests the creation of an OpenStack Security Group with two simple
512 custom rules then removes one by the rule setting object
515 sec_grp_rule_settings = list()
516 sec_grp_rule_settings.append(
517 SecurityGroupRuleSettings(
518 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
519 description='test_rule_1'))
520 sec_grp_rule_settings.append(
521 SecurityGroupRuleSettings(
522 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
523 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
524 description='test_rule_2'))
525 sec_grp_rule_settings.append(
526 SecurityGroupRuleSettings(
527 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
528 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
529 port_range_min=10, port_range_max=20,
530 description='test_rule_3'))
531 sec_grp_settings = SecurityGroupSettings(
532 name=self.sec_grp_name, description='hello group',
533 rule_settings=sec_grp_rule_settings)
534 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
535 self.os_creds, sec_grp_settings)
536 self.sec_grp_creator.create()
538 sec_grp = neutron_utils.get_security_group(self.neutron,
540 validation_utils.objects_equivalent(
541 self.sec_grp_creator.get_security_group(), sec_grp)
543 rules = neutron_utils.get_rules_by_security_group(
544 self.neutron, self.sec_grp_creator.get_security_group())
545 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
546 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
551 self.neutron, self.sec_grp_creator.sec_grp_settings,
552 self.sec_grp_creator.get_security_group(), rules))
554 self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
555 rules_after_del = neutron_utils.get_rules_by_security_group(
557 self.sec_grp_creator.get_security_group())
558 self.assertEqual(len(rules) - 1, len(rules_after_del))
561 def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
563 Returns True is the settings on a security group are properly contained
564 on the SNAPS SecurityGroup domain object
565 :param neutron: the neutron client
566 :param sec_grp_settings: the security group configuration
567 :param sec_grp: the SNAPS-OO security group object
568 :param rules: collection of SNAPS-OO security group rule objects
571 return (sec_grp.description == sec_grp_settings.description and
572 sec_grp.name == sec_grp_settings.name and
573 validate_sec_grp_rules(
574 neutron, sec_grp_settings.rule_settings, rules))
577 def validate_sec_grp_rules(neutron, rule_settings, rules):
579 Returns True is the settings on a security group rule are properly
580 contained on the SNAPS SecurityGroupRule domain object.
581 This function will only operate on rules that contain a description as
582 this is the only means to tell if the rule is custom or defaulted by
584 :param neutron: the neutron client
585 :param rule_settings: collection of SecurityGroupRuleSettings objects
586 :param rules: a collection of SecurityGroupRule domain objects
590 for rule_setting in rule_settings:
591 if rule_setting.description:
594 if rule_setting.protocol == Protocol.null:
597 setting_proto = rule_setting.protocol.name
599 sec_grp = neutron_utils.get_security_group(
600 neutron, rule_setting.sec_grp_name)
602 setting_eth_type = create_security_group.Ethertype.IPv4
603 if rule_setting.ethertype:
604 setting_eth_type = rule_setting.ethertype
609 if (rule.description == rule_setting.description and
610 rule.direction == rule_setting.direction.name and
611 rule.ethertype == setting_eth_type.name and
612 rule.port_range_max == rule_setting.port_range_max and
613 rule.port_range_min == rule_setting.port_range_min and
614 rule.protocol == setting_proto and
615 rule.remote_group_id == rule_setting.remote_group_id and
616 rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
617 rule.security_group_id == sec_grp.id):