1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.config.security_group import (
19 SecurityGroupConfig, SecurityGroupRuleConfig,
20 SecurityGroupRuleConfigError, SecurityGroupConfigError)
21 from snaps.openstack import create_security_group
22 from snaps.openstack.create_security_group import (
23 SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
24 Protocol, OpenStackSecurityGroup)
25 from snaps.openstack.tests import validation_utils
26 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
27 from snaps.openstack.utils import neutron_utils
29 __author__ = 'spisarski'
32 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
34 Tests the construction of the SecurityGroupRuleSettings class
37 def test_no_params(self):
38 with self.assertRaises(SecurityGroupRuleConfigError):
39 SecurityGroupRuleSettings()
41 def test_empty_config(self):
42 with self.assertRaises(SecurityGroupRuleConfigError):
43 SecurityGroupRuleSettings(**dict())
45 def test_name_only(self):
46 with self.assertRaises(SecurityGroupRuleConfigError):
47 SecurityGroupRuleSettings(sec_grp_name='foo')
49 def test_config_with_name_only(self):
50 with self.assertRaises(SecurityGroupRuleConfigError):
51 SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
53 def test_name_and_direction(self):
54 settings = SecurityGroupRuleSettings(sec_grp_name='foo',
55 direction=Direction.ingress)
56 self.assertEqual('foo', settings.sec_grp_name)
57 self.assertEqual(Direction.ingress.value, settings.direction.value)
59 def test_config_name_and_direction(self):
60 settings = SecurityGroupRuleSettings(
61 **{'sec_grp_name': 'foo', 'direction': 'ingress'})
62 self.assertEqual('foo', settings.sec_grp_name)
63 self.assertEqual(Direction.ingress.value, settings.direction.value)
65 def test_proto_ah_str(self):
66 settings = SecurityGroupRuleSettings(
67 **{'sec_grp_name': 'foo', 'direction': 'ingress',
69 self.assertEqual('foo', settings.sec_grp_name)
70 self.assertEqual(Direction.ingress.value, settings.direction.value)
71 self.assertEqual(Protocol.ah.value, settings.protocol.value)
73 def test_proto_ah_value(self):
74 settings = SecurityGroupRuleSettings(
75 **{'sec_grp_name': 'foo', 'direction': 'ingress',
77 self.assertEqual('foo', settings.sec_grp_name)
78 self.assertEqual(Direction.ingress.value, settings.direction.value)
79 self.assertEqual(Protocol.ah.value, settings.protocol.value)
81 def test_proto_any(self):
82 settings = SecurityGroupRuleSettings(
83 **{'sec_grp_name': 'foo', 'direction': 'ingress',
85 self.assertEqual('foo', settings.sec_grp_name)
86 self.assertEqual(Direction.ingress.value, settings.direction.value)
87 self.assertEqual(Protocol.null.value, settings.protocol.value)
89 def test_proto_null(self):
90 settings = SecurityGroupRuleSettings(
91 **{'sec_grp_name': 'foo', 'direction': 'ingress',
93 self.assertEqual('foo', settings.sec_grp_name)
94 self.assertEqual(Direction.ingress.value, settings.direction.value)
95 self.assertEqual(Protocol.null.value, settings.protocol.value)
98 settings = SecurityGroupRuleSettings(
99 sec_grp_name='foo', description='fubar',
100 direction=Direction.egress, remote_group_id='rgi',
101 protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
103 remote_ip_prefix='prfx')
104 self.assertEqual('foo', settings.sec_grp_name)
105 self.assertEqual('fubar', settings.description)
106 self.assertEqual(Direction.egress.value, settings.direction.value)
107 self.assertEqual('rgi', settings.remote_group_id)
108 self.assertEqual(Protocol.icmp.value, settings.protocol.value)
109 self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
110 self.assertEqual(1, settings.port_range_min)
111 self.assertEqual(2, settings.port_range_max)
112 self.assertEqual('prfx', settings.remote_ip_prefix)
114 def test_config_all(self):
115 settings = SecurityGroupRuleSettings(
116 **{'sec_grp_name': 'foo',
117 'description': 'fubar',
118 'direction': 'egress',
119 'remote_group_id': 'rgi',
124 'remote_ip_prefix': 'prfx'})
125 self.assertEqual('foo', settings.sec_grp_name)
126 self.assertEqual('fubar', settings.description)
127 self.assertEqual(Direction.egress.value, settings.direction.value)
128 self.assertEqual('rgi', settings.remote_group_id)
129 self.assertEqual(Protocol.tcp.value, settings.protocol.value)
130 self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
131 self.assertEqual(1, settings.port_range_min)
132 self.assertEqual(2, settings.port_range_max)
133 self.assertEqual('prfx', settings.remote_ip_prefix)
136 class SecurityGroupSettingsUnitTests(unittest.TestCase):
138 Tests the construction of the SecurityGroupSettings class
141 def test_no_params(self):
142 with self.assertRaises(SecurityGroupConfigError):
143 SecurityGroupSettings()
145 def test_empty_config(self):
146 with self.assertRaises(SecurityGroupConfigError):
147 SecurityGroupSettings(**dict())
149 def test_name_only(self):
150 settings = SecurityGroupSettings(name='foo')
151 self.assertEqual('foo', settings.name)
153 def test_config_with_name_only(self):
154 settings = SecurityGroupSettings(**{'name': 'foo'})
155 self.assertEqual('foo', settings.name)
157 def test_invalid_rule(self):
158 rule_setting = SecurityGroupRuleSettings(
159 sec_grp_name='bar', direction=Direction.ingress,
160 description='test_rule_1')
161 with self.assertRaises(SecurityGroupConfigError):
162 SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
165 rule_settings = list()
166 rule_settings.append(SecurityGroupRuleSettings(
167 sec_grp_name='bar', direction=Direction.egress,
168 description='test_rule_1'))
169 rule_settings.append(SecurityGroupRuleSettings(
170 sec_grp_name='bar', direction=Direction.ingress,
171 description='test_rule_2'))
172 settings = SecurityGroupSettings(
173 name='bar', description='fubar', project_name='foo',
174 rule_settings=rule_settings)
176 self.assertEqual('bar', settings.name)
177 self.assertEqual('fubar', settings.description)
178 self.assertEqual('foo', settings.project_name)
179 self.assertEqual(rule_settings[0], settings.rule_settings[0])
180 self.assertEqual(rule_settings[1], settings.rule_settings[1])
182 def test_config_all(self):
183 settings = SecurityGroupSettings(
185 'description': 'fubar',
186 'project_name': 'foo',
188 {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
190 self.assertEqual('bar', settings.name)
191 self.assertEqual('fubar', settings.description)
192 self.assertEqual('foo', settings.project_name)
193 self.assertEqual(1, len(settings.rule_settings))
194 self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
195 self.assertEqual(Direction.ingress.value,
196 settings.rule_settings[0].direction.value)
199 class CreateSecurityGroupTests(OSIntegrationTestCase):
201 Test for the CreateSecurityGroup class defined in create_security_group.py
206 Instantiates the CreateSecurityGroup object that is responsible for
207 downloading and creating an OS image file within OpenStack
209 super(self.__class__, self).__start__()
211 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
212 self.sec_grp_name = guid + 'name'
213 self.neutron = neutron_utils.neutron_client(self.os_creds)
215 # Initialize for cleanup
216 self.sec_grp_creator = None
220 Cleans the image and downloaded image file
222 if self.sec_grp_creator:
223 self.sec_grp_creator.clean()
225 super(self.__class__, self).__clean__()
227 def test_create_group_without_rules(self):
229 Tests the creation of an OpenStack Security Group without custom rules.
231 # Create Security Group
232 sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
233 description='hello group')
234 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
235 self.os_creds, sec_grp_settings)
236 self.sec_grp_creator.create()
238 sec_grp = neutron_utils.get_security_group(
239 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
240 self.assertIsNotNone(sec_grp)
242 validation_utils.objects_equivalent(
243 self.sec_grp_creator.get_security_group(), sec_grp)
244 rules = neutron_utils.get_rules_by_security_group(
245 self.neutron, self.sec_grp_creator.get_security_group())
246 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
247 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
252 self.neutron, self.keystone,
253 self.sec_grp_creator.sec_grp_settings,
254 self.sec_grp_creator.get_security_group()))
256 def test_create_group_admin_user_to_new_project(self):
258 Tests the creation of an OpenStack Security Group without custom rules.
260 # Create Security Group
261 sec_grp_settings = SecurityGroupConfig(
262 name=self.sec_grp_name, description='hello group',
263 project_name=self.os_creds.project_name)
264 self.sec_grp_creator = OpenStackSecurityGroup(
265 self.admin_os_creds, sec_grp_settings)
266 self.sec_grp_creator.create()
268 sec_grp = neutron_utils.get_security_group(
269 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
270 self.assertIsNotNone(sec_grp)
272 validation_utils.objects_equivalent(
273 self.sec_grp_creator.get_security_group(), sec_grp)
274 rules = neutron_utils.get_rules_by_security_group(
275 self.neutron, self.sec_grp_creator.get_security_group())
276 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
277 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
282 self.neutron, self.keystone,
283 self.sec_grp_creator.sec_grp_settings,
284 self.sec_grp_creator.get_security_group(), rules))
286 self.assertEqual(self.sec_grp_creator.get_security_group().id,
289 proj_creator = OpenStackSecurityGroup(
290 self.os_creds, SecurityGroupConfig(name=self.sec_grp_name))
291 proj_creator.create()
293 self.assertEqual(self.sec_grp_creator.get_security_group().id,
294 proj_creator.get_security_group().id)
296 def test_create_group_new_user_to_admin_project(self):
298 Tests the creation of an OpenStack Security Group without custom rules.
300 # Create Security Group
301 sec_grp_settings = SecurityGroupConfig(
302 name=self.sec_grp_name, description='hello group',
303 project_name=self.os_creds.project_name)
304 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
305 self.admin_os_creds, sec_grp_settings)
306 self.sec_grp_creator.create()
308 sec_grp = neutron_utils.get_security_group(
309 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
310 self.assertIsNotNone(sec_grp)
312 validation_utils.objects_equivalent(
313 self.sec_grp_creator.get_security_group(), sec_grp)
314 rules = neutron_utils.get_rules_by_security_group(
315 self.neutron, self.sec_grp_creator.get_security_group())
316 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
317 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
322 self.neutron, self.keystone,
323 self.sec_grp_creator.sec_grp_settings,
324 self.sec_grp_creator.get_security_group(), rules))
326 def test_create_delete_group(self):
328 Tests the creation of an OpenStack Security Group without custom rules.
330 # Create Security Group
331 sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
332 description='hello group')
333 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
334 self.os_creds, sec_grp_settings)
335 created_sec_grp = self.sec_grp_creator.create()
336 self.assertIsNotNone(created_sec_grp)
340 self.neutron, self.keystone,
341 self.sec_grp_creator.sec_grp_settings,
342 self.sec_grp_creator.get_security_group()))
344 neutron_utils.delete_security_group(self.neutron, created_sec_grp)
345 self.assertIsNone(neutron_utils.get_security_group(
346 self.neutron, self.keystone,
347 sec_grp_settings=self.sec_grp_creator.sec_grp_settings))
349 self.sec_grp_creator.clean()
351 def test_create_group_with_one_simple_rule(self):
353 Tests the creation of an OpenStack Security Group with one simple
356 # Create Security Group
357 sec_grp_rule_settings = list()
358 sec_grp_rule_settings.append(
359 SecurityGroupRuleConfig(
360 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
361 description='test_rule_1'))
362 sec_grp_settings = SecurityGroupConfig(
363 name=self.sec_grp_name, description='hello group',
364 rule_settings=sec_grp_rule_settings)
365 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
366 self.os_creds, sec_grp_settings)
367 self.sec_grp_creator.create()
369 sec_grp = neutron_utils.get_security_group(
370 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
371 validation_utils.objects_equivalent(
372 self.sec_grp_creator.get_security_group(), sec_grp)
373 rules = neutron_utils.get_rules_by_security_group(
374 self.neutron, self.sec_grp_creator.get_security_group())
375 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
376 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
381 self.neutron, self.keystone,
382 self.sec_grp_creator.sec_grp_settings,
383 self.sec_grp_creator.get_security_group(), rules))
385 def test_create_group_with_one_complex_rule(self):
387 Tests the creation of an OpenStack Security Group with one simple
390 # Create Security Group
391 sec_grp_rule_settings = list()
392 sec_grp_rule_settings.append(
393 SecurityGroupRuleConfig(
394 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
395 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
396 port_range_min=10, port_range_max=20,
397 description='test_rule_1'))
398 sec_grp_settings = SecurityGroupConfig(
399 name=self.sec_grp_name, description='hello group',
400 rule_settings=sec_grp_rule_settings)
401 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
402 self.os_creds, sec_grp_settings)
403 self.sec_grp_creator.create()
405 sec_grp = neutron_utils.get_security_group(
406 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
407 validation_utils.objects_equivalent(
408 self.sec_grp_creator.get_security_group(), sec_grp)
409 rules = neutron_utils.get_rules_by_security_group(
410 self.neutron, self.sec_grp_creator.get_security_group())
411 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
412 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
417 self.neutron, self.keystone,
418 self.sec_grp_creator.sec_grp_settings,
419 self.sec_grp_creator.get_security_group(), rules))
421 def test_create_group_with_several_rules(self):
423 Tests the creation of an OpenStack Security Group with one simple
426 # Create Security Group
427 sec_grp_rule_settings = list()
428 sec_grp_rule_settings.append(
429 SecurityGroupRuleConfig(
430 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
431 description='test_rule_1'))
432 sec_grp_rule_settings.append(
433 SecurityGroupRuleConfig(
434 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
435 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
436 description='test_rule_2'))
437 sec_grp_rule_settings.append(
438 SecurityGroupRuleConfig(
439 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
440 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
441 port_range_min=10, port_range_max=20,
442 description='test_rule_3'))
443 sec_grp_settings = SecurityGroupConfig(
444 name=self.sec_grp_name, description='hello group',
445 rule_settings=sec_grp_rule_settings)
446 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
447 self.os_creds, sec_grp_settings)
448 self.sec_grp_creator.create()
450 sec_grp = neutron_utils.get_security_group(
451 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
452 validation_utils.objects_equivalent(
453 self.sec_grp_creator.get_security_group(), sec_grp)
454 rules = neutron_utils.get_rules_by_security_group(
455 self.neutron, self.sec_grp_creator.get_security_group())
456 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
457 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
462 self.neutron, self.keystone,
463 self.sec_grp_creator.sec_grp_settings,
464 self.sec_grp_creator.get_security_group(), rules))
466 def test_add_rule(self):
468 Tests the creation of an OpenStack Security Group with one simple
469 custom rule then adds one after creation.
471 # Create Security Group
472 sec_grp_rule_settings = list()
473 sec_grp_rule_settings.append(
474 SecurityGroupRuleConfig(
475 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
476 description='test_rule_1'))
477 sec_grp_settings = SecurityGroupConfig(
478 name=self.sec_grp_name, description='hello group',
479 rule_settings=sec_grp_rule_settings)
480 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
481 self.os_creds, sec_grp_settings)
482 self.sec_grp_creator.create()
484 sec_grp = neutron_utils.get_security_group(
485 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
486 validation_utils.objects_equivalent(
487 self.sec_grp_creator.get_security_group(), sec_grp)
489 rules = neutron_utils.get_rules_by_security_group(
490 self.neutron, self.sec_grp_creator.get_security_group())
494 self.neutron, self.keystone,
495 self.sec_grp_creator.sec_grp_settings,
496 self.sec_grp_creator.get_security_group(), rules))
498 rules = neutron_utils.get_rules_by_security_group(
499 self.neutron, self.sec_grp_creator.get_security_group())
500 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
501 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
504 self.sec_grp_creator.add_rule(SecurityGroupRuleConfig(
505 sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
506 direction=Direction.egress, protocol=Protocol.icmp,
507 description='test_rule_2'))
508 rules2 = neutron_utils.get_rules_by_security_group(
509 self.neutron, self.sec_grp_creator.get_security_group())
510 self.assertEqual(len(rules) + 1, len(rules2))
512 def test_remove_rule_by_id(self):
514 Tests the creation of an OpenStack Security Group with two simple
515 custom rules then removes one by the rule ID.
517 # Create Security Group
518 sec_grp_rule_settings = list()
519 sec_grp_rule_settings.append(
520 SecurityGroupRuleConfig(
521 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
522 description='test_rule_1'))
523 sec_grp_rule_settings.append(
524 SecurityGroupRuleConfig(
525 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
526 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
527 description='test_rule_2'))
528 sec_grp_rule_settings.append(
529 SecurityGroupRuleConfig(
530 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
531 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
532 port_range_min=10, port_range_max=20,
533 description='test_rule_3'))
534 sec_grp_settings = SecurityGroupConfig(
535 name=self.sec_grp_name, description='hello group',
536 rule_settings=sec_grp_rule_settings)
537 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
538 self.os_creds, sec_grp_settings)
539 self.sec_grp_creator.create()
541 sec_grp = neutron_utils.get_security_group(
542 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
543 validation_utils.objects_equivalent(
544 self.sec_grp_creator.get_security_group(), sec_grp)
545 rules = neutron_utils.get_rules_by_security_group(
546 self.neutron, self.sec_grp_creator.get_security_group())
547 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
548 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
553 self.neutron, self.keystone,
554 self.sec_grp_creator.sec_grp_settings,
555 self.sec_grp_creator.get_security_group(), rules))
557 self.sec_grp_creator.remove_rule(
559 rules_after_del = neutron_utils.get_rules_by_security_group(
561 self.sec_grp_creator.get_security_group())
562 self.assertEqual(len(rules) - 1, len(rules_after_del))
564 def test_remove_rule_by_setting(self):
566 Tests the creation of an OpenStack Security Group with two simple
567 custom rules then removes one by the rule setting object
569 # Create Security Group
570 sec_grp_rule_settings = list()
571 sec_grp_rule_settings.append(
572 SecurityGroupRuleConfig(
573 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
574 description='test_rule_1'))
575 sec_grp_rule_settings.append(
576 SecurityGroupRuleConfig(
577 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
578 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
579 description='test_rule_2'))
580 sec_grp_rule_settings.append(
581 SecurityGroupRuleConfig(
582 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
583 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
584 port_range_min=10, port_range_max=20,
585 description='test_rule_3'))
586 sec_grp_settings = SecurityGroupConfig(
587 name=self.sec_grp_name, description='hello group',
588 rule_settings=sec_grp_rule_settings)
589 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
590 self.os_creds, sec_grp_settings)
591 self.sec_grp_creator.create()
593 sec_grp = neutron_utils.get_security_group(
594 self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
595 validation_utils.objects_equivalent(
596 self.sec_grp_creator.get_security_group(), sec_grp)
598 rules = neutron_utils.get_rules_by_security_group(
599 self.neutron, self.sec_grp_creator.get_security_group())
600 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
601 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
606 self.neutron, self.keystone,
607 self.sec_grp_creator.sec_grp_settings,
608 self.sec_grp_creator.get_security_group(), rules))
610 self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
611 rules_after_del = neutron_utils.get_rules_by_security_group(
613 self.sec_grp_creator.get_security_group())
614 self.assertEqual(len(rules) - 1, len(rules_after_del))
617 def validate_sec_grp(neutron, keystone, sec_grp_settings, sec_grp,
620 Returns True is the settings on a security group are properly contained
621 on the SNAPS SecurityGroup domain object
622 :param neutron: the neutron client
623 :param keystone: the keystone client
624 :param sec_grp_settings: the security group configuration
625 :param sec_grp: the SNAPS-OO security group object
626 :param rules: collection of SNAPS-OO security group rule objects
629 return (sec_grp.description == sec_grp_settings.description and
630 sec_grp.name == sec_grp_settings.name and
631 validate_sec_grp_rules(
632 neutron, keystone, sec_grp_settings.rule_settings, rules))
635 def validate_sec_grp_rules(neutron, keystone, rule_settings, rules):
637 Returns True is the settings on a security group rule are properly
638 contained on the SNAPS SecurityGroupRule domain object.
639 This function will only operate on rules that contain a description as
640 this is the only means to tell if the rule is custom or defaulted by
642 :param neutron: the neutron client
643 :param keystone: the keystone client
644 :param rule_settings: collection of SecurityGroupRuleConfig objects
645 :param rules: a collection of SecurityGroupRule domain objects
649 for rule_setting in rule_settings:
650 if rule_setting.description:
653 sec_grp = neutron_utils.get_security_group(
654 neutron, keystone, sec_grp_name=rule_setting.sec_grp_name)
656 setting_eth_type = create_security_group.Ethertype.IPv4
657 if rule_setting.ethertype:
658 setting_eth_type = rule_setting.ethertype
665 proto_str = rule.protocol
667 if (rule.description == rule_setting.description and
668 rule.direction == rule_setting.direction.name and
669 rule.ethertype == setting_eth_type.name and
670 rule.port_range_max == rule_setting.port_range_max and
671 rule.port_range_min == rule_setting.port_range_min and
672 proto_str == str(rule_setting.protocol.value) and
673 rule.remote_group_id == rule_setting.remote_group_id and
674 rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
675 rule.security_group_id == sec_grp.id):
685 class CreateMultipleSecurityGroupTests(OSIntegrationTestCase):
687 Test for the CreateSecurityGroup class and how it interacts with security
688 groups within other projects with the same name
693 Instantiates the CreateSecurityGroup object that is responsible for
694 downloading and creating an OS image file within OpenStack
696 super(self.__class__, self).__start__()
698 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
699 self.sec_grp_name = guid + 'name'
700 self.neutron = neutron_utils.neutron_client(self.os_creds)
702 # Initialize for cleanup
703 self.admin_sec_grp_config = SecurityGroupConfig(
704 name=self.sec_grp_name, description='hello group')
705 self.sec_grp_creator_admin = OpenStackSecurityGroup(
706 self.admin_os_creds, self.admin_sec_grp_config)
707 self.sec_grp_creator_admin.create()
708 self.sec_grp_creator_proj = None
712 Cleans the image and downloaded image file
714 if self.sec_grp_creator_admin:
715 self.sec_grp_creator_admin.clean()
716 if self.sec_grp_creator_proj:
717 self.sec_grp_creator_proj.clean()
719 super(self.__class__, self).__clean__()
721 def test_sec_grp_same_name_diff_proj(self):
723 Tests the creation of an OpenStack Security Group with the same name
724 within a different project/tenant.
726 # Create Security Group
727 sec_grp_config = SecurityGroupConfig(
728 name=self.sec_grp_name, description='hello group')
729 self.sec_grp_creator_proj = OpenStackSecurityGroup(
730 self.os_creds, sec_grp_config)
731 self.sec_grp_creator_proj.create()
734 self.sec_grp_creator_admin.get_security_group().id,
735 self.sec_grp_creator_proj.get_security_group().id)
737 admin_sec_grp_creator = OpenStackSecurityGroup(
738 self.admin_os_creds, self.admin_sec_grp_config)
739 admin_sec_grp_creator.create()
740 self.assertEqual(self.sec_grp_creator_admin.get_security_group().id,
741 admin_sec_grp_creator.get_security_group().id)
743 proj_sec_grp_creator = OpenStackSecurityGroup(
744 self.os_creds, sec_grp_config)
745 proj_sec_grp_creator.create()
746 self.assertEqual(self.sec_grp_creator_proj.get_security_group().id,
747 proj_sec_grp_creator.get_security_group().id)