1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.config.security_group import (
19 SecurityGroupConfig, SecurityGroupRuleConfig,
20 SecurityGroupRuleConfigError, SecurityGroupConfigError)
21 from snaps.openstack import create_security_group
22 from snaps.openstack.create_security_group import (
23 SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
24 Protocol, OpenStackSecurityGroup)
25 from snaps.openstack.tests import validation_utils
26 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
27 from snaps.openstack.utils import neutron_utils
29 __author__ = 'spisarski'
32 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
34 Tests the construction of the SecurityGroupRuleSettings class
37 def test_no_params(self):
38 with self.assertRaises(SecurityGroupRuleConfigError):
39 SecurityGroupRuleSettings()
41 def test_empty_config(self):
42 with self.assertRaises(SecurityGroupRuleConfigError):
43 SecurityGroupRuleSettings(**dict())
45 def test_name_only(self):
46 with self.assertRaises(SecurityGroupRuleConfigError):
47 SecurityGroupRuleSettings(sec_grp_name='foo')
49 def test_config_with_name_only(self):
50 with self.assertRaises(SecurityGroupRuleConfigError):
51 SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
53 def test_name_and_direction(self):
54 settings = SecurityGroupRuleSettings(sec_grp_name='foo',
55 direction=Direction.ingress)
56 self.assertEqual('foo', settings.sec_grp_name)
57 self.assertEqual(Direction.ingress.value, settings.direction.value)
59 def test_config_name_and_direction(self):
60 settings = SecurityGroupRuleSettings(
61 **{'sec_grp_name': 'foo', 'direction': 'ingress'})
62 self.assertEqual('foo', settings.sec_grp_name)
63 self.assertEqual(Direction.ingress.value, settings.direction.value)
65 def test_proto_ah_str(self):
66 settings = SecurityGroupRuleSettings(
67 **{'sec_grp_name': 'foo', 'direction': 'ingress',
69 self.assertEqual('foo', settings.sec_grp_name)
70 self.assertEqual(Direction.ingress.value, settings.direction.value)
71 self.assertEqual(Protocol.ah.value, settings.protocol.value)
73 def test_proto_ah_value(self):
74 settings = SecurityGroupRuleSettings(
75 **{'sec_grp_name': 'foo', 'direction': 'ingress',
77 self.assertEqual('foo', settings.sec_grp_name)
78 self.assertEqual(Direction.ingress.value, settings.direction.value)
79 self.assertEqual(Protocol.ah.value, settings.protocol.value)
81 def test_proto_any(self):
82 settings = SecurityGroupRuleSettings(
83 **{'sec_grp_name': 'foo', 'direction': 'ingress',
85 self.assertEqual('foo', settings.sec_grp_name)
86 self.assertEqual(Direction.ingress.value, settings.direction.value)
87 self.assertEqual(Protocol.null.value, settings.protocol.value)
89 def test_proto_null(self):
90 settings = SecurityGroupRuleSettings(
91 **{'sec_grp_name': 'foo', 'direction': 'ingress',
93 self.assertEqual('foo', settings.sec_grp_name)
94 self.assertEqual(Direction.ingress.value, settings.direction.value)
95 self.assertEqual(Protocol.null.value, settings.protocol.value)
98 settings = SecurityGroupRuleSettings(
99 sec_grp_name='foo', description='fubar',
100 direction=Direction.egress, remote_group_id='rgi',
101 protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
103 remote_ip_prefix='prfx')
104 self.assertEqual('foo', settings.sec_grp_name)
105 self.assertEqual('fubar', settings.description)
106 self.assertEqual(Direction.egress.value, settings.direction.value)
107 self.assertEqual('rgi', settings.remote_group_id)
108 self.assertEqual(Protocol.icmp.value, settings.protocol.value)
109 self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
110 self.assertEqual(1, settings.port_range_min)
111 self.assertEqual(2, settings.port_range_max)
112 self.assertEqual('prfx', settings.remote_ip_prefix)
114 def test_config_all(self):
115 settings = SecurityGroupRuleSettings(
116 **{'sec_grp_name': 'foo',
117 'description': 'fubar',
118 'direction': 'egress',
119 'remote_group_id': 'rgi',
124 'remote_ip_prefix': 'prfx'})
125 self.assertEqual('foo', settings.sec_grp_name)
126 self.assertEqual('fubar', settings.description)
127 self.assertEqual(Direction.egress.value, settings.direction.value)
128 self.assertEqual('rgi', settings.remote_group_id)
129 self.assertEqual(Protocol.tcp.value, settings.protocol.value)
130 self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
131 self.assertEqual(1, settings.port_range_min)
132 self.assertEqual(2, settings.port_range_max)
133 self.assertEqual('prfx', settings.remote_ip_prefix)
136 class SecurityGroupSettingsUnitTests(unittest.TestCase):
138 Tests the construction of the SecurityGroupSettings class
141 def test_no_params(self):
142 with self.assertRaises(SecurityGroupConfigError):
143 SecurityGroupSettings()
145 def test_empty_config(self):
146 with self.assertRaises(SecurityGroupConfigError):
147 SecurityGroupSettings(**dict())
149 def test_name_only(self):
150 settings = SecurityGroupSettings(name='foo')
151 self.assertEqual('foo', settings.name)
153 def test_config_with_name_only(self):
154 settings = SecurityGroupSettings(**{'name': 'foo'})
155 self.assertEqual('foo', settings.name)
157 def test_invalid_rule(self):
158 rule_setting = SecurityGroupRuleSettings(
159 sec_grp_name='bar', direction=Direction.ingress,
160 description='test_rule_1')
161 with self.assertRaises(SecurityGroupConfigError):
162 SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
165 rule_settings = list()
166 rule_settings.append(SecurityGroupRuleSettings(
167 sec_grp_name='bar', direction=Direction.egress,
168 description='test_rule_1'))
169 rule_settings.append(SecurityGroupRuleSettings(
170 sec_grp_name='bar', direction=Direction.ingress,
171 description='test_rule_2'))
172 settings = SecurityGroupSettings(
173 name='bar', description='fubar', project_name='foo',
174 rule_settings=rule_settings)
176 self.assertEqual('bar', settings.name)
177 self.assertEqual('fubar', settings.description)
178 self.assertEqual('foo', settings.project_name)
179 self.assertEqual(rule_settings[0], settings.rule_settings[0])
180 self.assertEqual(rule_settings[1], settings.rule_settings[1])
182 def test_config_all(self):
183 settings = SecurityGroupSettings(
185 'description': 'fubar',
186 'project_name': 'foo',
188 {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
190 self.assertEqual('bar', settings.name)
191 self.assertEqual('fubar', settings.description)
192 self.assertEqual('foo', settings.project_name)
193 self.assertEqual(1, len(settings.rule_settings))
194 self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
195 self.assertEqual(Direction.ingress.value,
196 settings.rule_settings[0].direction.value)
199 class CreateSecurityGroupTests(OSIntegrationTestCase):
201 Test for the CreateSecurityGroup class defined in create_security_group.py
206 Instantiates the CreateSecurityGroup object that is responsible for
207 downloading and creating an OS image file within OpenStack
209 super(self.__class__, self).__start__()
211 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
212 self.sec_grp_name = guid + 'name'
213 self.neutron = neutron_utils.neutron_client(self.os_creds)
215 # Initialize for cleanup
216 self.sec_grp_creator = None
220 Cleans the image and downloaded image file
222 if self.sec_grp_creator:
223 self.sec_grp_creator.clean()
225 super(self.__class__, self).__clean__()
227 def test_create_group_without_rules(self):
229 Tests the creation of an OpenStack Security Group without custom rules.
232 sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
233 description='hello group')
234 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
235 self.os_creds, sec_grp_settings)
236 self.sec_grp_creator.create()
238 sec_grp = neutron_utils.get_security_group(
239 self.neutron, sec_grp_settings=sec_grp_settings)
240 self.assertIsNotNone(sec_grp)
242 validation_utils.objects_equivalent(
243 self.sec_grp_creator.get_security_group(), sec_grp)
244 rules = neutron_utils.get_rules_by_security_group(
245 self.neutron, self.sec_grp_creator.get_security_group())
246 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
247 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
252 self.neutron, self.sec_grp_creator.sec_grp_settings,
253 self.sec_grp_creator.get_security_group()))
255 def test_create_group_admin_user_to_new_project(self):
257 Tests the creation of an OpenStack Security Group without custom rules.
260 sec_grp_settings = SecurityGroupConfig(
261 name=self.sec_grp_name, description='hello group',
262 project_name=self.admin_os_creds.project_name)
263 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
264 self.os_creds, sec_grp_settings)
265 self.sec_grp_creator.create()
267 sec_grp = neutron_utils.get_security_group(
268 self.neutron, sec_grp_settings=sec_grp_settings)
269 self.assertIsNotNone(sec_grp)
271 validation_utils.objects_equivalent(
272 self.sec_grp_creator.get_security_group(), sec_grp)
273 rules = neutron_utils.get_rules_by_security_group(
274 self.neutron, self.sec_grp_creator.get_security_group())
275 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
276 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
281 self.neutron, self.sec_grp_creator.sec_grp_settings,
282 self.sec_grp_creator.get_security_group(), rules))
284 def test_create_group_new_user_to_admin_project(self):
286 Tests the creation of an OpenStack Security Group without custom rules.
289 sec_grp_settings = SecurityGroupConfig(
290 name=self.sec_grp_name, description='hello group',
291 project_name=self.os_creds.project_name)
292 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
293 self.admin_os_creds, sec_grp_settings)
294 self.sec_grp_creator.create()
296 sec_grp = neutron_utils.get_security_group(
297 self.neutron, sec_grp_settings=sec_grp_settings)
298 self.assertIsNotNone(sec_grp)
300 validation_utils.objects_equivalent(
301 self.sec_grp_creator.get_security_group(), sec_grp)
302 rules = neutron_utils.get_rules_by_security_group(
303 self.neutron, self.sec_grp_creator.get_security_group())
304 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
305 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
310 self.neutron, self.sec_grp_creator.sec_grp_settings,
311 self.sec_grp_creator.get_security_group(), rules))
313 def test_create_delete_group(self):
315 Tests the creation of an OpenStack Security Group without custom rules.
318 sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
319 description='hello group')
320 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
321 self.os_creds, sec_grp_settings)
322 created_sec_grp = self.sec_grp_creator.create()
323 self.assertIsNotNone(created_sec_grp)
327 self.neutron, self.sec_grp_creator.sec_grp_settings,
328 self.sec_grp_creator.get_security_group()))
330 neutron_utils.delete_security_group(self.neutron, created_sec_grp)
331 self.assertIsNone(neutron_utils.get_security_group(
333 sec_grp_settings=self.sec_grp_creator.sec_grp_settings))
335 self.sec_grp_creator.clean()
337 def test_create_group_with_one_simple_rule(self):
339 Tests the creation of an OpenStack Security Group with one simple
343 sec_grp_rule_settings = list()
344 sec_grp_rule_settings.append(
345 SecurityGroupRuleConfig(
346 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
347 description='test_rule_1'))
348 sec_grp_settings = SecurityGroupConfig(
349 name=self.sec_grp_name, description='hello group',
350 rule_settings=sec_grp_rule_settings)
351 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
352 self.os_creds, sec_grp_settings)
353 self.sec_grp_creator.create()
355 sec_grp = neutron_utils.get_security_group(
356 self.neutron, sec_grp_settings=sec_grp_settings)
357 validation_utils.objects_equivalent(
358 self.sec_grp_creator.get_security_group(), sec_grp)
359 rules = neutron_utils.get_rules_by_security_group(
360 self.neutron, self.sec_grp_creator.get_security_group())
361 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
362 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
367 self.neutron, self.sec_grp_creator.sec_grp_settings,
368 self.sec_grp_creator.get_security_group(), rules))
370 def test_create_group_with_one_complex_rule(self):
372 Tests the creation of an OpenStack Security Group with one simple
376 sec_grp_rule_settings = list()
377 sec_grp_rule_settings.append(
378 SecurityGroupRuleConfig(
379 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
380 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
381 port_range_min=10, port_range_max=20,
382 description='test_rule_1'))
383 sec_grp_settings = SecurityGroupConfig(
384 name=self.sec_grp_name, description='hello group',
385 rule_settings=sec_grp_rule_settings)
386 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
387 self.os_creds, sec_grp_settings)
388 self.sec_grp_creator.create()
390 sec_grp = neutron_utils.get_security_group(
391 self.neutron, sec_grp_settings=sec_grp_settings)
392 validation_utils.objects_equivalent(
393 self.sec_grp_creator.get_security_group(), sec_grp)
394 rules = neutron_utils.get_rules_by_security_group(
395 self.neutron, self.sec_grp_creator.get_security_group())
396 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
397 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
402 self.neutron, self.sec_grp_creator.sec_grp_settings,
403 self.sec_grp_creator.get_security_group(), rules))
405 def test_create_group_with_several_rules(self):
407 Tests the creation of an OpenStack Security Group with one simple
411 sec_grp_rule_settings = list()
412 sec_grp_rule_settings.append(
413 SecurityGroupRuleConfig(
414 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
415 description='test_rule_1'))
416 sec_grp_rule_settings.append(
417 SecurityGroupRuleConfig(
418 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
419 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
420 description='test_rule_2'))
421 sec_grp_rule_settings.append(
422 SecurityGroupRuleConfig(
423 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
424 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
425 port_range_min=10, port_range_max=20,
426 description='test_rule_3'))
427 sec_grp_settings = SecurityGroupConfig(
428 name=self.sec_grp_name, description='hello group',
429 rule_settings=sec_grp_rule_settings)
430 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
431 self.os_creds, sec_grp_settings)
432 self.sec_grp_creator.create()
434 sec_grp = neutron_utils.get_security_group(
435 self.neutron, sec_grp_settings=sec_grp_settings)
436 validation_utils.objects_equivalent(
437 self.sec_grp_creator.get_security_group(), sec_grp)
438 rules = neutron_utils.get_rules_by_security_group(
439 self.neutron, self.sec_grp_creator.get_security_group())
440 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
441 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
446 self.neutron, self.sec_grp_creator.sec_grp_settings,
447 self.sec_grp_creator.get_security_group(), rules))
449 def test_add_rule(self):
451 Tests the creation of an OpenStack Security Group with one simple
452 custom rule then adds one after creation.
455 sec_grp_rule_settings = list()
456 sec_grp_rule_settings.append(
457 SecurityGroupRuleConfig(
458 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
459 description='test_rule_1'))
460 sec_grp_settings = SecurityGroupConfig(
461 name=self.sec_grp_name, description='hello group',
462 rule_settings=sec_grp_rule_settings)
463 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
464 self.os_creds, sec_grp_settings)
465 self.sec_grp_creator.create()
467 sec_grp = neutron_utils.get_security_group(
468 self.neutron, sec_grp_settings=sec_grp_settings)
469 validation_utils.objects_equivalent(
470 self.sec_grp_creator.get_security_group(), sec_grp)
472 rules = neutron_utils.get_rules_by_security_group(
473 self.neutron, self.sec_grp_creator.get_security_group())
477 self.neutron, self.sec_grp_creator.sec_grp_settings,
478 self.sec_grp_creator.get_security_group(), rules))
480 rules = neutron_utils.get_rules_by_security_group(
481 self.neutron, self.sec_grp_creator.get_security_group())
482 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
483 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
486 self.sec_grp_creator.add_rule(SecurityGroupRuleConfig(
487 sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
488 direction=Direction.egress, protocol=Protocol.icmp,
489 description='test_rule_2'))
490 rules2 = neutron_utils.get_rules_by_security_group(
491 self.neutron, self.sec_grp_creator.get_security_group())
492 self.assertEqual(len(rules) + 1, len(rules2))
494 def test_remove_rule_by_id(self):
496 Tests the creation of an OpenStack Security Group with two simple
497 custom rules then removes one by the rule ID.
500 sec_grp_rule_settings = list()
501 sec_grp_rule_settings.append(
502 SecurityGroupRuleConfig(
503 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
504 description='test_rule_1'))
505 sec_grp_rule_settings.append(
506 SecurityGroupRuleConfig(
507 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
508 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
509 description='test_rule_2'))
510 sec_grp_rule_settings.append(
511 SecurityGroupRuleConfig(
512 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
513 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
514 port_range_min=10, port_range_max=20,
515 description='test_rule_3'))
516 sec_grp_settings = SecurityGroupConfig(
517 name=self.sec_grp_name, description='hello group',
518 rule_settings=sec_grp_rule_settings)
519 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
520 self.os_creds, sec_grp_settings)
521 self.sec_grp_creator.create()
523 sec_grp = neutron_utils.get_security_group(
524 self.neutron, sec_grp_settings=sec_grp_settings)
525 validation_utils.objects_equivalent(
526 self.sec_grp_creator.get_security_group(), sec_grp)
527 rules = neutron_utils.get_rules_by_security_group(
528 self.neutron, self.sec_grp_creator.get_security_group())
529 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
530 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
535 self.neutron, self.sec_grp_creator.sec_grp_settings,
536 self.sec_grp_creator.get_security_group(), rules))
538 self.sec_grp_creator.remove_rule(
540 rules_after_del = neutron_utils.get_rules_by_security_group(
542 self.sec_grp_creator.get_security_group())
543 self.assertEqual(len(rules) - 1, len(rules_after_del))
545 def test_remove_rule_by_setting(self):
547 Tests the creation of an OpenStack Security Group with two simple
548 custom rules then removes one by the rule setting object
551 sec_grp_rule_settings = list()
552 sec_grp_rule_settings.append(
553 SecurityGroupRuleConfig(
554 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
555 description='test_rule_1'))
556 sec_grp_rule_settings.append(
557 SecurityGroupRuleConfig(
558 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
559 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
560 description='test_rule_2'))
561 sec_grp_rule_settings.append(
562 SecurityGroupRuleConfig(
563 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
564 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
565 port_range_min=10, port_range_max=20,
566 description='test_rule_3'))
567 sec_grp_settings = SecurityGroupConfig(
568 name=self.sec_grp_name, description='hello group',
569 rule_settings=sec_grp_rule_settings)
570 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
571 self.os_creds, sec_grp_settings)
572 self.sec_grp_creator.create()
574 sec_grp = neutron_utils.get_security_group(
575 self.neutron, sec_grp_settings=sec_grp_settings)
576 validation_utils.objects_equivalent(
577 self.sec_grp_creator.get_security_group(), sec_grp)
579 rules = neutron_utils.get_rules_by_security_group(
580 self.neutron, self.sec_grp_creator.get_security_group())
581 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
582 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
587 self.neutron, self.sec_grp_creator.sec_grp_settings,
588 self.sec_grp_creator.get_security_group(), rules))
590 self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
591 rules_after_del = neutron_utils.get_rules_by_security_group(
593 self.sec_grp_creator.get_security_group())
594 self.assertEqual(len(rules) - 1, len(rules_after_del))
597 def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
599 Returns True is the settings on a security group are properly contained
600 on the SNAPS SecurityGroup domain object
601 :param neutron: the neutron client
602 :param sec_grp_settings: the security group configuration
603 :param sec_grp: the SNAPS-OO security group object
604 :param rules: collection of SNAPS-OO security group rule objects
607 return (sec_grp.description == sec_grp_settings.description and
608 sec_grp.name == sec_grp_settings.name and
609 validate_sec_grp_rules(
610 neutron, sec_grp_settings.rule_settings, rules))
613 def validate_sec_grp_rules(neutron, rule_settings, rules):
615 Returns True is the settings on a security group rule are properly
616 contained on the SNAPS SecurityGroupRule domain object.
617 This function will only operate on rules that contain a description as
618 this is the only means to tell if the rule is custom or defaulted by
620 :param neutron: the neutron client
621 :param rule_settings: collection of SecurityGroupRuleConfig objects
622 :param rules: a collection of SecurityGroupRule domain objects
626 for rule_setting in rule_settings:
627 if rule_setting.description:
630 sec_grp = neutron_utils.get_security_group(
631 neutron, sec_grp_name=rule_setting.sec_grp_name)
633 setting_eth_type = create_security_group.Ethertype.IPv4
634 if rule_setting.ethertype:
635 setting_eth_type = rule_setting.ethertype
642 proto_str = rule.protocol
644 if (rule.description == rule_setting.description and
645 rule.direction == rule_setting.direction.name and
646 rule.ethertype == setting_eth_type.name and
647 rule.port_range_max == rule_setting.port_range_max and
648 rule.port_range_min == rule_setting.port_range_min and
649 proto_str == str(rule_setting.protocol.value) and
650 rule.remote_group_id == rule_setting.remote_group_id and
651 rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
652 rule.security_group_id == sec_grp.id):
662 class CreateMultipleSecurityGroupTests(OSIntegrationTestCase):
664 Test for the CreateSecurityGroup class and how it interacts with security
665 groups within other projects with the same name
670 Instantiates the CreateSecurityGroup object that is responsible for
671 downloading and creating an OS image file within OpenStack
673 super(self.__class__, self).__start__()
675 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
676 self.sec_grp_name = guid + 'name'
677 self.neutron = neutron_utils.neutron_client(self.os_creds)
679 # Initialize for cleanup
680 self.admin_sec_grp_config = SecurityGroupConfig(
681 name=self.sec_grp_name, description='hello group')
682 self.sec_grp_creator_admin = OpenStackSecurityGroup(
683 self.admin_os_creds, self.admin_sec_grp_config)
684 self.sec_grp_creator_admin.create()
685 self.sec_grp_creator_proj = None
689 Cleans the image and downloaded image file
691 if self.sec_grp_creator_admin:
692 self.sec_grp_creator_admin.clean()
693 if self.sec_grp_creator_proj:
694 self.sec_grp_creator_proj.clean()
696 super(self.__class__, self).__clean__()
698 def test_sec_grp_same_name_diff_proj(self):
700 Tests the creation of an OpenStack Security Group with the same name
701 within a different project/tenant.
704 sec_grp_config = SecurityGroupConfig(
705 name=self.sec_grp_name, description='hello group')
706 self.sec_grp_creator_proj = OpenStackSecurityGroup(
707 self.os_creds, sec_grp_config)
708 self.sec_grp_creator_proj.create()
711 self.sec_grp_creator_admin.get_security_group().id,
712 self.sec_grp_creator_proj.get_security_group().id)
714 admin_sec_grp_creator = OpenStackSecurityGroup(
715 self.admin_os_creds, self.admin_sec_grp_config)
716 admin_sec_grp_creator.create()
717 self.assertEqual(self.sec_grp_creator_admin.get_security_group().id,
718 admin_sec_grp_creator.get_security_group().id)
720 proj_sec_grp_creator = OpenStackSecurityGroup(
721 self.os_creds, sec_grp_config)
722 proj_sec_grp_creator.create()
723 self.assertEqual(self.sec_grp_creator_proj.get_security_group().id,
724 proj_sec_grp_creator.get_security_group().id)