1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import (
20 SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
21 Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError)
22 from snaps.openstack.tests import validation_utils
23 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
24 from snaps.openstack.utils import neutron_utils
26 __author__ = 'spisarski'
29 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
31 Tests the construction of the SecurityGroupRuleSettings class
34 def test_no_params(self):
35 with self.assertRaises(SecurityGroupRuleSettingsError):
36 SecurityGroupRuleSettings()
38 def test_empty_config(self):
39 with self.assertRaises(SecurityGroupRuleSettingsError):
40 SecurityGroupRuleSettings(**dict())
42 def test_name_only(self):
43 with self.assertRaises(SecurityGroupRuleSettingsError):
44 SecurityGroupRuleSettings(sec_grp_name='foo')
46 def test_config_with_name_only(self):
47 with self.assertRaises(SecurityGroupRuleSettingsError):
48 SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
50 def test_name_and_direction(self):
51 settings = SecurityGroupRuleSettings(sec_grp_name='foo',
52 direction=Direction.ingress)
53 self.assertEqual('foo', settings.sec_grp_name)
54 self.assertEqual(Direction.ingress, settings.direction)
56 def test_config_name_and_direction(self):
57 settings = SecurityGroupRuleSettings(
58 **{'sec_grp_name': 'foo', 'direction': 'ingress'})
59 self.assertEqual('foo', settings.sec_grp_name)
60 self.assertEqual(Direction.ingress, settings.direction)
63 settings = SecurityGroupRuleSettings(
64 sec_grp_name='foo', description='fubar',
65 direction=Direction.egress, remote_group_id='rgi',
66 protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
68 remote_ip_prefix='prfx')
69 self.assertEqual('foo', settings.sec_grp_name)
70 self.assertEqual('fubar', settings.description)
71 self.assertEqual(Direction.egress, settings.direction)
72 self.assertEqual('rgi', settings.remote_group_id)
73 self.assertEqual(Protocol.icmp, settings.protocol)
74 self.assertEqual(Ethertype.IPv6, settings.ethertype)
75 self.assertEqual(1, settings.port_range_min)
76 self.assertEqual(2, settings.port_range_max)
77 self.assertEqual('prfx', settings.remote_ip_prefix)
79 def test_config_all(self):
80 settings = SecurityGroupRuleSettings(
81 **{'sec_grp_name': 'foo',
82 'description': 'fubar',
83 'direction': 'egress',
84 'remote_group_id': 'rgi',
89 'remote_ip_prefix': 'prfx'})
90 self.assertEqual('foo', settings.sec_grp_name)
91 self.assertEqual('fubar', settings.description)
92 self.assertEqual(Direction.egress, settings.direction)
93 self.assertEqual('rgi', settings.remote_group_id)
94 self.assertEqual(Protocol.tcp, settings.protocol)
95 self.assertEqual(Ethertype.IPv6, settings.ethertype)
96 self.assertEqual(1, settings.port_range_min)
97 self.assertEqual(2, settings.port_range_max)
98 self.assertEqual('prfx', settings.remote_ip_prefix)
101 class SecurityGroupSettingsUnitTests(unittest.TestCase):
103 Tests the construction of the SecurityGroupSettings class
106 def test_no_params(self):
107 with self.assertRaises(SecurityGroupSettingsError):
108 SecurityGroupSettings()
110 def test_empty_config(self):
111 with self.assertRaises(SecurityGroupSettingsError):
112 SecurityGroupSettings(**dict())
114 def test_name_only(self):
115 settings = SecurityGroupSettings(name='foo')
116 self.assertEqual('foo', settings.name)
118 def test_config_with_name_only(self):
119 settings = SecurityGroupSettings(**{'name': 'foo'})
120 self.assertEqual('foo', settings.name)
122 def test_invalid_rule(self):
123 rule_setting = SecurityGroupRuleSettings(
124 sec_grp_name='bar', direction=Direction.ingress,
125 description='test_rule_1')
126 with self.assertRaises(SecurityGroupSettingsError):
127 SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
130 rule_settings = list()
131 rule_settings.append(SecurityGroupRuleSettings(
132 sec_grp_name='bar', direction=Direction.egress,
133 description='test_rule_1'))
134 rule_settings.append(SecurityGroupRuleSettings(
135 sec_grp_name='bar', direction=Direction.ingress,
136 description='test_rule_2'))
137 settings = SecurityGroupSettings(
138 name='bar', description='fubar', project_name='foo',
139 rule_settings=rule_settings)
141 self.assertEqual('bar', settings.name)
142 self.assertEqual('fubar', settings.description)
143 self.assertEqual('foo', settings.project_name)
144 self.assertEqual(rule_settings[0], settings.rule_settings[0])
145 self.assertEqual(rule_settings[1], settings.rule_settings[1])
147 def test_config_all(self):
148 settings = SecurityGroupSettings(
150 'description': 'fubar',
151 'project_name': 'foo',
153 {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
155 self.assertEqual('bar', settings.name)
156 self.assertEqual('fubar', settings.description)
157 self.assertEqual('foo', settings.project_name)
158 self.assertEqual(1, len(settings.rule_settings))
159 self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
160 self.assertEqual(Direction.ingress,
161 settings.rule_settings[0].direction)
164 class CreateSecurityGroupTests(OSIntegrationTestCase):
166 Test for the CreateSecurityGroup class defined in create_security_group.py
171 Instantiates the CreateSecurityGroup object that is responsible for
172 downloading and creating an OS image file within OpenStack
174 super(self.__class__, self).__start__()
176 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
177 self.sec_grp_name = guid + 'name'
178 self.neutron = neutron_utils.neutron_client(self.os_creds)
180 # Initialize for cleanup
181 self.sec_grp_creator = None
185 Cleans the image and downloaded image file
187 if self.sec_grp_creator:
188 self.sec_grp_creator.clean()
190 super(self.__class__, self).__clean__()
192 def test_create_group_without_rules(self):
194 Tests the creation of an OpenStack Security Group without custom rules.
197 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
198 description='hello group')
199 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
200 self.os_creds, sec_grp_settings)
201 self.sec_grp_creator.create()
203 sec_grp = neutron_utils.get_security_group(
204 self.neutron, sec_grp_settings=sec_grp_settings)
205 self.assertIsNotNone(sec_grp)
207 validation_utils.objects_equivalent(
208 self.sec_grp_creator.get_security_group(), sec_grp)
209 rules = neutron_utils.get_rules_by_security_group(
210 self.neutron, self.sec_grp_creator.get_security_group())
211 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
212 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
217 self.neutron, self.sec_grp_creator.sec_grp_settings,
218 self.sec_grp_creator.get_security_group()))
220 def test_create_group_admin_user_to_new_project(self):
222 Tests the creation of an OpenStack Security Group without custom rules.
225 sec_grp_settings = SecurityGroupSettings(
226 name=self.sec_grp_name, description='hello group',
227 project_name=self.admin_os_creds.project_name)
228 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
229 self.os_creds, sec_grp_settings)
230 self.sec_grp_creator.create()
232 sec_grp = neutron_utils.get_security_group(
233 self.neutron, sec_grp_settings=sec_grp_settings)
234 self.assertIsNotNone(sec_grp)
236 validation_utils.objects_equivalent(
237 self.sec_grp_creator.get_security_group(), sec_grp)
238 rules = neutron_utils.get_rules_by_security_group(
239 self.neutron, self.sec_grp_creator.get_security_group())
240 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
241 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
246 self.neutron, self.sec_grp_creator.sec_grp_settings,
247 self.sec_grp_creator.get_security_group(), rules))
249 def test_create_group_new_user_to_admin_project(self):
251 Tests the creation of an OpenStack Security Group without custom rules.
254 sec_grp_settings = SecurityGroupSettings(
255 name=self.sec_grp_name, description='hello group',
256 project_name=self.os_creds.project_name)
257 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
258 self.admin_os_creds, sec_grp_settings)
259 self.sec_grp_creator.create()
261 sec_grp = neutron_utils.get_security_group(
262 self.neutron, sec_grp_settings=sec_grp_settings)
263 self.assertIsNotNone(sec_grp)
265 validation_utils.objects_equivalent(
266 self.sec_grp_creator.get_security_group(), sec_grp)
267 rules = neutron_utils.get_rules_by_security_group(
268 self.neutron, self.sec_grp_creator.get_security_group())
269 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
270 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
275 self.neutron, self.sec_grp_creator.sec_grp_settings,
276 self.sec_grp_creator.get_security_group(), rules))
278 def test_create_delete_group(self):
280 Tests the creation of an OpenStack Security Group without custom rules.
283 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
284 description='hello group')
285 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
286 self.os_creds, sec_grp_settings)
287 created_sec_grp = self.sec_grp_creator.create()
288 self.assertIsNotNone(created_sec_grp)
292 self.neutron, self.sec_grp_creator.sec_grp_settings,
293 self.sec_grp_creator.get_security_group()))
295 neutron_utils.delete_security_group(self.neutron, created_sec_grp)
296 self.assertIsNone(neutron_utils.get_security_group(
298 sec_grp_settings=self.sec_grp_creator.sec_grp_settings))
300 self.sec_grp_creator.clean()
302 def test_create_group_with_one_simple_rule(self):
304 Tests the creation of an OpenStack Security Group with one simple
308 sec_grp_rule_settings = list()
309 sec_grp_rule_settings.append(
310 SecurityGroupRuleSettings(
311 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
312 description='test_rule_1'))
313 sec_grp_settings = SecurityGroupSettings(
314 name=self.sec_grp_name, description='hello group',
315 rule_settings=sec_grp_rule_settings)
316 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
317 self.os_creds, sec_grp_settings)
318 self.sec_grp_creator.create()
320 sec_grp = neutron_utils.get_security_group(
321 self.neutron, sec_grp_settings=sec_grp_settings)
322 validation_utils.objects_equivalent(
323 self.sec_grp_creator.get_security_group(), sec_grp)
324 rules = neutron_utils.get_rules_by_security_group(
325 self.neutron, self.sec_grp_creator.get_security_group())
326 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
327 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
332 self.neutron, self.sec_grp_creator.sec_grp_settings,
333 self.sec_grp_creator.get_security_group(), rules))
335 def test_create_group_with_one_complex_rule(self):
337 Tests the creation of an OpenStack Security Group with one simple
341 sec_grp_rule_settings = list()
342 sec_grp_rule_settings.append(
343 SecurityGroupRuleSettings(
344 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
345 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
346 port_range_min=10, port_range_max=20,
347 description='test_rule_1'))
348 sec_grp_settings = SecurityGroupSettings(
349 name=self.sec_grp_name, description='hello group',
350 rule_settings=sec_grp_rule_settings)
351 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
352 self.os_creds, sec_grp_settings)
353 self.sec_grp_creator.create()
355 sec_grp = neutron_utils.get_security_group(
356 self.neutron, sec_grp_settings=sec_grp_settings)
357 validation_utils.objects_equivalent(
358 self.sec_grp_creator.get_security_group(), sec_grp)
359 rules = neutron_utils.get_rules_by_security_group(
360 self.neutron, self.sec_grp_creator.get_security_group())
361 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
362 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
367 self.neutron, self.sec_grp_creator.sec_grp_settings,
368 self.sec_grp_creator.get_security_group(), rules))
370 def test_create_group_with_several_rules(self):
372 Tests the creation of an OpenStack Security Group with one simple
376 sec_grp_rule_settings = list()
377 sec_grp_rule_settings.append(
378 SecurityGroupRuleSettings(
379 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
380 description='test_rule_1'))
381 sec_grp_rule_settings.append(
382 SecurityGroupRuleSettings(
383 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
384 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
385 description='test_rule_2'))
386 sec_grp_rule_settings.append(
387 SecurityGroupRuleSettings(
388 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
389 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
390 port_range_min=10, port_range_max=20,
391 description='test_rule_3'))
392 sec_grp_settings = SecurityGroupSettings(
393 name=self.sec_grp_name, description='hello group',
394 rule_settings=sec_grp_rule_settings)
395 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
396 self.os_creds, sec_grp_settings)
397 self.sec_grp_creator.create()
399 sec_grp = neutron_utils.get_security_group(
400 self.neutron, sec_grp_settings=sec_grp_settings)
401 validation_utils.objects_equivalent(
402 self.sec_grp_creator.get_security_group(), sec_grp)
403 rules = neutron_utils.get_rules_by_security_group(
404 self.neutron, self.sec_grp_creator.get_security_group())
405 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
406 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
411 self.neutron, self.sec_grp_creator.sec_grp_settings,
412 self.sec_grp_creator.get_security_group(), rules))
414 def test_add_rule(self):
416 Tests the creation of an OpenStack Security Group with one simple
417 custom rule then adds one after creation.
420 sec_grp_rule_settings = list()
421 sec_grp_rule_settings.append(
422 SecurityGroupRuleSettings(
423 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
424 description='test_rule_1'))
425 sec_grp_settings = SecurityGroupSettings(
426 name=self.sec_grp_name, description='hello group',
427 rule_settings=sec_grp_rule_settings)
428 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
429 self.os_creds, sec_grp_settings)
430 self.sec_grp_creator.create()
432 sec_grp = neutron_utils.get_security_group(
433 self.neutron, sec_grp_settings=sec_grp_settings)
434 validation_utils.objects_equivalent(
435 self.sec_grp_creator.get_security_group(), sec_grp)
437 rules = neutron_utils.get_rules_by_security_group(
438 self.neutron, self.sec_grp_creator.get_security_group())
442 self.neutron, self.sec_grp_creator.sec_grp_settings,
443 self.sec_grp_creator.get_security_group(), rules))
445 rules = neutron_utils.get_rules_by_security_group(
446 self.neutron, self.sec_grp_creator.get_security_group())
447 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
448 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
451 self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
452 sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
453 direction=Direction.egress, protocol=Protocol.icmp,
454 description='test_rule_2'))
455 rules2 = neutron_utils.get_rules_by_security_group(
456 self.neutron, self.sec_grp_creator.get_security_group())
457 self.assertEqual(len(rules) + 1, len(rules2))
459 def test_remove_rule_by_id(self):
461 Tests the creation of an OpenStack Security Group with two simple
462 custom rules then removes one by the rule ID.
465 sec_grp_rule_settings = list()
466 sec_grp_rule_settings.append(
467 SecurityGroupRuleSettings(
468 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
469 description='test_rule_1'))
470 sec_grp_rule_settings.append(
471 SecurityGroupRuleSettings(
472 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
473 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
474 description='test_rule_2'))
475 sec_grp_rule_settings.append(
476 SecurityGroupRuleSettings(
477 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
478 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
479 port_range_min=10, port_range_max=20,
480 description='test_rule_3'))
481 sec_grp_settings = SecurityGroupSettings(
482 name=self.sec_grp_name, description='hello group',
483 rule_settings=sec_grp_rule_settings)
484 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
485 self.os_creds, sec_grp_settings)
486 self.sec_grp_creator.create()
488 sec_grp = neutron_utils.get_security_group(
489 self.neutron, sec_grp_settings=sec_grp_settings)
490 validation_utils.objects_equivalent(
491 self.sec_grp_creator.get_security_group(), sec_grp)
492 rules = neutron_utils.get_rules_by_security_group(
493 self.neutron, self.sec_grp_creator.get_security_group())
494 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
495 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
500 self.neutron, self.sec_grp_creator.sec_grp_settings,
501 self.sec_grp_creator.get_security_group(), rules))
503 self.sec_grp_creator.remove_rule(
505 rules_after_del = neutron_utils.get_rules_by_security_group(
507 self.sec_grp_creator.get_security_group())
508 self.assertEqual(len(rules) - 1, len(rules_after_del))
510 def test_remove_rule_by_setting(self):
512 Tests the creation of an OpenStack Security Group with two simple
513 custom rules then removes one by the rule setting object
516 sec_grp_rule_settings = list()
517 sec_grp_rule_settings.append(
518 SecurityGroupRuleSettings(
519 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
520 description='test_rule_1'))
521 sec_grp_rule_settings.append(
522 SecurityGroupRuleSettings(
523 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
524 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
525 description='test_rule_2'))
526 sec_grp_rule_settings.append(
527 SecurityGroupRuleSettings(
528 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
529 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
530 port_range_min=10, port_range_max=20,
531 description='test_rule_3'))
532 sec_grp_settings = SecurityGroupSettings(
533 name=self.sec_grp_name, description='hello group',
534 rule_settings=sec_grp_rule_settings)
535 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
536 self.os_creds, sec_grp_settings)
537 self.sec_grp_creator.create()
539 sec_grp = neutron_utils.get_security_group(
540 self.neutron, sec_grp_settings=sec_grp_settings)
541 validation_utils.objects_equivalent(
542 self.sec_grp_creator.get_security_group(), sec_grp)
544 rules = neutron_utils.get_rules_by_security_group(
545 self.neutron, self.sec_grp_creator.get_security_group())
546 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
547 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
552 self.neutron, self.sec_grp_creator.sec_grp_settings,
553 self.sec_grp_creator.get_security_group(), rules))
555 self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
556 rules_after_del = neutron_utils.get_rules_by_security_group(
558 self.sec_grp_creator.get_security_group())
559 self.assertEqual(len(rules) - 1, len(rules_after_del))
562 def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
564 Returns True is the settings on a security group are properly contained
565 on the SNAPS SecurityGroup domain object
566 :param neutron: the neutron client
567 :param sec_grp_settings: the security group configuration
568 :param sec_grp: the SNAPS-OO security group object
569 :param rules: collection of SNAPS-OO security group rule objects
572 return (sec_grp.description == sec_grp_settings.description and
573 sec_grp.name == sec_grp_settings.name and
574 validate_sec_grp_rules(
575 neutron, sec_grp_settings.rule_settings, rules))
578 def validate_sec_grp_rules(neutron, rule_settings, rules):
580 Returns True is the settings on a security group rule are properly
581 contained on the SNAPS SecurityGroupRule domain object.
582 This function will only operate on rules that contain a description as
583 this is the only means to tell if the rule is custom or defaulted by
585 :param neutron: the neutron client
586 :param rule_settings: collection of SecurityGroupRuleSettings objects
587 :param rules: a collection of SecurityGroupRule domain objects
591 for rule_setting in rule_settings:
592 if rule_setting.description:
595 if rule_setting.protocol == Protocol.null:
598 setting_proto = rule_setting.protocol.name
600 sec_grp = neutron_utils.get_security_group(
601 neutron, sec_grp_name=rule_setting.sec_grp_name)
603 setting_eth_type = create_security_group.Ethertype.IPv4
604 if rule_setting.ethertype:
605 setting_eth_type = rule_setting.ethertype
610 if (rule.description == rule_setting.description and
611 rule.direction == rule_setting.direction.name and
612 rule.ethertype == setting_eth_type.name and
613 rule.port_range_max == rule_setting.port_range_max and
614 rule.port_range_min == rule_setting.port_range_min and
615 rule.protocol == setting_proto and
616 rule.remote_group_id == rule_setting.remote_group_id and
617 rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
618 rule.security_group_id == sec_grp.id):