Changed pattern on how objects lookup themselves by name and project.
[snaps.git] / snaps / openstack / tests / create_security_group_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import unittest
16 import uuid
17
18 from snaps.config.security_group import (
19     SecurityGroupConfig,  SecurityGroupRuleConfig,
20     SecurityGroupRuleConfigError, SecurityGroupConfigError)
21 from snaps.openstack import create_security_group
22 from snaps.openstack.create_security_group import (
23     SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
24     Protocol, OpenStackSecurityGroup)
25 from snaps.openstack.tests import validation_utils
26 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
27 from snaps.openstack.utils import neutron_utils
28
29 __author__ = 'spisarski'
30
31
32 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
33     """
34     Tests the construction of the SecurityGroupRuleSettings class
35     """
36
37     def test_no_params(self):
38         with self.assertRaises(SecurityGroupRuleConfigError):
39             SecurityGroupRuleSettings()
40
41     def test_empty_config(self):
42         with self.assertRaises(SecurityGroupRuleConfigError):
43             SecurityGroupRuleSettings(**dict())
44
45     def test_name_only(self):
46         with self.assertRaises(SecurityGroupRuleConfigError):
47             SecurityGroupRuleSettings(sec_grp_name='foo')
48
49     def test_config_with_name_only(self):
50         with self.assertRaises(SecurityGroupRuleConfigError):
51             SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
52
53     def test_name_and_direction(self):
54         settings = SecurityGroupRuleSettings(sec_grp_name='foo',
55                                              direction=Direction.ingress)
56         self.assertEqual('foo', settings.sec_grp_name)
57         self.assertEqual(Direction.ingress.value, settings.direction.value)
58
59     def test_config_name_and_direction(self):
60         settings = SecurityGroupRuleSettings(
61             **{'sec_grp_name': 'foo', 'direction': 'ingress'})
62         self.assertEqual('foo', settings.sec_grp_name)
63         self.assertEqual(Direction.ingress.value, settings.direction.value)
64
65     def test_proto_ah_str(self):
66         settings = SecurityGroupRuleSettings(
67             **{'sec_grp_name': 'foo', 'direction': 'ingress',
68                'protocol': 'ah'})
69         self.assertEqual('foo', settings.sec_grp_name)
70         self.assertEqual(Direction.ingress.value, settings.direction.value)
71         self.assertEqual(Protocol.ah.value, settings.protocol.value)
72
73     def test_proto_ah_value(self):
74         settings = SecurityGroupRuleSettings(
75             **{'sec_grp_name': 'foo', 'direction': 'ingress',
76                'protocol': 51})
77         self.assertEqual('foo', settings.sec_grp_name)
78         self.assertEqual(Direction.ingress.value, settings.direction.value)
79         self.assertEqual(Protocol.ah.value, settings.protocol.value)
80
81     def test_proto_any(self):
82         settings = SecurityGroupRuleSettings(
83             **{'sec_grp_name': 'foo', 'direction': 'ingress',
84                'protocol': 'any'})
85         self.assertEqual('foo', settings.sec_grp_name)
86         self.assertEqual(Direction.ingress.value, settings.direction.value)
87         self.assertEqual(Protocol.null.value, settings.protocol.value)
88
89     def test_proto_null(self):
90         settings = SecurityGroupRuleSettings(
91             **{'sec_grp_name': 'foo', 'direction': 'ingress',
92                'protocol': 'null'})
93         self.assertEqual('foo', settings.sec_grp_name)
94         self.assertEqual(Direction.ingress.value, settings.direction.value)
95         self.assertEqual(Protocol.null.value, settings.protocol.value)
96
97     def test_all(self):
98         settings = SecurityGroupRuleSettings(
99             sec_grp_name='foo', description='fubar',
100             direction=Direction.egress, remote_group_id='rgi',
101             protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
102             port_range_max=2,
103             remote_ip_prefix='prfx')
104         self.assertEqual('foo', settings.sec_grp_name)
105         self.assertEqual('fubar', settings.description)
106         self.assertEqual(Direction.egress.value, settings.direction.value)
107         self.assertEqual('rgi', settings.remote_group_id)
108         self.assertEqual(Protocol.icmp.value, settings.protocol.value)
109         self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
110         self.assertEqual(1, settings.port_range_min)
111         self.assertEqual(2, settings.port_range_max)
112         self.assertEqual('prfx', settings.remote_ip_prefix)
113
114     def test_config_all(self):
115         settings = SecurityGroupRuleSettings(
116             **{'sec_grp_name': 'foo',
117                'description': 'fubar',
118                'direction': 'egress',
119                'remote_group_id': 'rgi',
120                'protocol': 'tcp',
121                'ethertype': 'IPv6',
122                'port_range_min': 1,
123                'port_range_max': 2,
124                'remote_ip_prefix': 'prfx'})
125         self.assertEqual('foo', settings.sec_grp_name)
126         self.assertEqual('fubar', settings.description)
127         self.assertEqual(Direction.egress.value, settings.direction.value)
128         self.assertEqual('rgi', settings.remote_group_id)
129         self.assertEqual(Protocol.tcp.value, settings.protocol.value)
130         self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
131         self.assertEqual(1, settings.port_range_min)
132         self.assertEqual(2, settings.port_range_max)
133         self.assertEqual('prfx', settings.remote_ip_prefix)
134
135
136 class SecurityGroupSettingsUnitTests(unittest.TestCase):
137     """
138     Tests the construction of the SecurityGroupSettings class
139     """
140
141     def test_no_params(self):
142         with self.assertRaises(SecurityGroupConfigError):
143             SecurityGroupSettings()
144
145     def test_empty_config(self):
146         with self.assertRaises(SecurityGroupConfigError):
147             SecurityGroupSettings(**dict())
148
149     def test_name_only(self):
150         settings = SecurityGroupSettings(name='foo')
151         self.assertEqual('foo', settings.name)
152
153     def test_config_with_name_only(self):
154         settings = SecurityGroupSettings(**{'name': 'foo'})
155         self.assertEqual('foo', settings.name)
156
157     def test_invalid_rule(self):
158         rule_setting = SecurityGroupRuleSettings(
159             sec_grp_name='bar', direction=Direction.ingress,
160             description='test_rule_1')
161         with self.assertRaises(SecurityGroupConfigError):
162             SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
163
164     def test_all(self):
165         rule_settings = list()
166         rule_settings.append(SecurityGroupRuleSettings(
167             sec_grp_name='bar', direction=Direction.egress,
168             description='test_rule_1'))
169         rule_settings.append(SecurityGroupRuleSettings(
170             sec_grp_name='bar', direction=Direction.ingress,
171             description='test_rule_2'))
172         settings = SecurityGroupSettings(
173             name='bar', description='fubar', project_name='foo',
174             rule_settings=rule_settings)
175
176         self.assertEqual('bar', settings.name)
177         self.assertEqual('fubar', settings.description)
178         self.assertEqual('foo', settings.project_name)
179         self.assertEqual(rule_settings[0], settings.rule_settings[0])
180         self.assertEqual(rule_settings[1], settings.rule_settings[1])
181
182     def test_config_all(self):
183         settings = SecurityGroupSettings(
184             **{'name': 'bar',
185                'description': 'fubar',
186                'project_name': 'foo',
187                'rules': [
188                    {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
189
190         self.assertEqual('bar', settings.name)
191         self.assertEqual('fubar', settings.description)
192         self.assertEqual('foo', settings.project_name)
193         self.assertEqual(1, len(settings.rule_settings))
194         self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
195         self.assertEqual(Direction.ingress.value,
196                          settings.rule_settings[0].direction.value)
197
198
199 class CreateSecurityGroupTests(OSIntegrationTestCase):
200     """
201     Test for the CreateSecurityGroup class defined in create_security_group.py
202     """
203
204     def setUp(self):
205         """
206         Instantiates the CreateSecurityGroup object that is responsible for
207         downloading and creating an OS image file within OpenStack
208         """
209         super(self.__class__, self).__start__()
210
211         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
212         self.sec_grp_name = guid + 'name'
213         self.neutron = neutron_utils.neutron_client(self.os_creds)
214
215         # Initialize for cleanup
216         self.sec_grp_creator = None
217
218     def tearDown(self):
219         """
220         Cleans the image and downloaded image file
221         """
222         if self.sec_grp_creator:
223             self.sec_grp_creator.clean()
224
225         super(self.__class__, self).__clean__()
226
227     def test_create_group_without_rules(self):
228         """
229         Tests the creation of an OpenStack Security Group without custom rules.
230         """
231         # Create Security Group
232         sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
233                                                description='hello group')
234         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
235             self.os_creds, sec_grp_settings)
236         self.sec_grp_creator.create()
237
238         sec_grp = neutron_utils.get_security_group(
239             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
240         self.assertIsNotNone(sec_grp)
241
242         validation_utils.objects_equivalent(
243             self.sec_grp_creator.get_security_group(), sec_grp)
244         rules = neutron_utils.get_rules_by_security_group(
245             self.neutron, self.sec_grp_creator.get_security_group())
246         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
247         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
248                                             rules)
249
250         self.assertTrue(
251             validate_sec_grp(
252                 self.neutron, self.keystone,
253                 self.sec_grp_creator.sec_grp_settings,
254                 self.sec_grp_creator.get_security_group()))
255
256     def test_create_group_admin_user_to_new_project(self):
257         """
258         Tests the creation of an OpenStack Security Group without custom rules.
259         """
260         # Create Security Group
261         sec_grp_settings = SecurityGroupConfig(
262             name=self.sec_grp_name, description='hello group',
263             project_name=self.os_creds.project_name)
264         self.sec_grp_creator = OpenStackSecurityGroup(
265             self.admin_os_creds, sec_grp_settings)
266         self.sec_grp_creator.create()
267
268         sec_grp = neutron_utils.get_security_group(
269             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
270         self.assertIsNotNone(sec_grp)
271
272         validation_utils.objects_equivalent(
273             self.sec_grp_creator.get_security_group(), sec_grp)
274         rules = neutron_utils.get_rules_by_security_group(
275             self.neutron, self.sec_grp_creator.get_security_group())
276         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
277         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
278                                             rules)
279
280         self.assertTrue(
281             validate_sec_grp(
282                 self.neutron, self.keystone,
283                 self.sec_grp_creator.sec_grp_settings,
284                 self.sec_grp_creator.get_security_group(), rules))
285
286         self.assertEqual(self.sec_grp_creator.get_security_group().id,
287                          sec_grp.id)
288
289         proj_creator = OpenStackSecurityGroup(
290             self.os_creds, SecurityGroupConfig(name=self.sec_grp_name))
291         proj_creator.create()
292
293         self.assertEqual(self.sec_grp_creator.get_security_group().id,
294                          proj_creator.get_security_group().id)
295
296     def test_create_group_new_user_to_admin_project(self):
297         """
298         Tests the creation of an OpenStack Security Group without custom rules.
299         """
300         # Create Security Group
301         sec_grp_settings = SecurityGroupConfig(
302             name=self.sec_grp_name, description='hello group',
303             project_name=self.os_creds.project_name)
304         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
305             self.admin_os_creds, sec_grp_settings)
306         self.sec_grp_creator.create()
307
308         sec_grp = neutron_utils.get_security_group(
309             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
310         self.assertIsNotNone(sec_grp)
311
312         validation_utils.objects_equivalent(
313             self.sec_grp_creator.get_security_group(), sec_grp)
314         rules = neutron_utils.get_rules_by_security_group(
315             self.neutron, self.sec_grp_creator.get_security_group())
316         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
317         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
318                                             rules)
319
320         self.assertTrue(
321             validate_sec_grp(
322                 self.neutron, self.keystone,
323                 self.sec_grp_creator.sec_grp_settings,
324                 self.sec_grp_creator.get_security_group(), rules))
325
326     def test_create_delete_group(self):
327         """
328         Tests the creation of an OpenStack Security Group without custom rules.
329         """
330         # Create Security Group
331         sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
332                                                description='hello group')
333         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
334             self.os_creds, sec_grp_settings)
335         created_sec_grp = self.sec_grp_creator.create()
336         self.assertIsNotNone(created_sec_grp)
337
338         self.assertTrue(
339             validate_sec_grp(
340                 self.neutron, self.keystone,
341                 self.sec_grp_creator.sec_grp_settings,
342                 self.sec_grp_creator.get_security_group()))
343
344         neutron_utils.delete_security_group(self.neutron, created_sec_grp)
345         self.assertIsNone(neutron_utils.get_security_group(
346             self.neutron, self.keystone,
347             sec_grp_settings=self.sec_grp_creator.sec_grp_settings))
348
349         self.sec_grp_creator.clean()
350
351     def test_create_group_with_one_simple_rule(self):
352         """
353         Tests the creation of an OpenStack Security Group with one simple
354         custom rule.
355         """
356         # Create Security Group
357         sec_grp_rule_settings = list()
358         sec_grp_rule_settings.append(
359             SecurityGroupRuleConfig(
360                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
361                 description='test_rule_1'))
362         sec_grp_settings = SecurityGroupConfig(
363             name=self.sec_grp_name, description='hello group',
364             rule_settings=sec_grp_rule_settings)
365         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
366             self.os_creds, sec_grp_settings)
367         self.sec_grp_creator.create()
368
369         sec_grp = neutron_utils.get_security_group(
370             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
371         validation_utils.objects_equivalent(
372             self.sec_grp_creator.get_security_group(), sec_grp)
373         rules = neutron_utils.get_rules_by_security_group(
374             self.neutron, self.sec_grp_creator.get_security_group())
375         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
376         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
377                                             rules)
378
379         self.assertTrue(
380             validate_sec_grp(
381                 self.neutron, self.keystone,
382                 self.sec_grp_creator.sec_grp_settings,
383                 self.sec_grp_creator.get_security_group(), rules))
384
385     def test_create_group_with_one_complex_rule(self):
386         """
387         Tests the creation of an OpenStack Security Group with one simple
388         custom rule.
389         """
390         # Create Security Group
391         sec_grp_rule_settings = list()
392         sec_grp_rule_settings.append(
393             SecurityGroupRuleConfig(
394                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
395                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
396                 port_range_min=10, port_range_max=20,
397                 description='test_rule_1'))
398         sec_grp_settings = SecurityGroupConfig(
399             name=self.sec_grp_name, description='hello group',
400             rule_settings=sec_grp_rule_settings)
401         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
402             self.os_creds, sec_grp_settings)
403         self.sec_grp_creator.create()
404
405         sec_grp = neutron_utils.get_security_group(
406             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
407         validation_utils.objects_equivalent(
408             self.sec_grp_creator.get_security_group(), sec_grp)
409         rules = neutron_utils.get_rules_by_security_group(
410             self.neutron, self.sec_grp_creator.get_security_group())
411         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
412         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
413                                             rules)
414
415         self.assertTrue(
416             validate_sec_grp(
417                 self.neutron, self.keystone,
418                 self.sec_grp_creator.sec_grp_settings,
419                 self.sec_grp_creator.get_security_group(), rules))
420
421     def test_create_group_with_several_rules(self):
422         """
423         Tests the creation of an OpenStack Security Group with one simple
424         custom rule.
425         """
426         # Create Security Group
427         sec_grp_rule_settings = list()
428         sec_grp_rule_settings.append(
429             SecurityGroupRuleConfig(
430                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
431                 description='test_rule_1'))
432         sec_grp_rule_settings.append(
433             SecurityGroupRuleConfig(
434                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
435                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
436                 description='test_rule_2'))
437         sec_grp_rule_settings.append(
438             SecurityGroupRuleConfig(
439                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
440                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
441                 port_range_min=10, port_range_max=20,
442                 description='test_rule_3'))
443         sec_grp_settings = SecurityGroupConfig(
444             name=self.sec_grp_name, description='hello group',
445             rule_settings=sec_grp_rule_settings)
446         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
447             self.os_creds, sec_grp_settings)
448         self.sec_grp_creator.create()
449
450         sec_grp = neutron_utils.get_security_group(
451             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
452         validation_utils.objects_equivalent(
453             self.sec_grp_creator.get_security_group(), sec_grp)
454         rules = neutron_utils.get_rules_by_security_group(
455             self.neutron, self.sec_grp_creator.get_security_group())
456         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
457         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
458                                             rules)
459
460         self.assertTrue(
461             validate_sec_grp(
462                 self.neutron, self.keystone,
463                 self.sec_grp_creator.sec_grp_settings,
464                 self.sec_grp_creator.get_security_group(), rules))
465
466     def test_add_rule(self):
467         """
468         Tests the creation of an OpenStack Security Group with one simple
469         custom rule then adds one after creation.
470         """
471         # Create Security Group
472         sec_grp_rule_settings = list()
473         sec_grp_rule_settings.append(
474             SecurityGroupRuleConfig(
475                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
476                 description='test_rule_1'))
477         sec_grp_settings = SecurityGroupConfig(
478             name=self.sec_grp_name, description='hello group',
479             rule_settings=sec_grp_rule_settings)
480         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
481             self.os_creds, sec_grp_settings)
482         self.sec_grp_creator.create()
483
484         sec_grp = neutron_utils.get_security_group(
485             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
486         validation_utils.objects_equivalent(
487             self.sec_grp_creator.get_security_group(), sec_grp)
488
489         rules = neutron_utils.get_rules_by_security_group(
490             self.neutron, self.sec_grp_creator.get_security_group())
491
492         self.assertTrue(
493             validate_sec_grp(
494                 self.neutron, self.keystone,
495                 self.sec_grp_creator.sec_grp_settings,
496                 self.sec_grp_creator.get_security_group(), rules))
497
498         rules = neutron_utils.get_rules_by_security_group(
499             self.neutron, self.sec_grp_creator.get_security_group())
500         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
501         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
502                                             rules)
503
504         self.sec_grp_creator.add_rule(SecurityGroupRuleConfig(
505             sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
506             direction=Direction.egress, protocol=Protocol.icmp,
507             description='test_rule_2'))
508         rules2 = neutron_utils.get_rules_by_security_group(
509             self.neutron, self.sec_grp_creator.get_security_group())
510         self.assertEqual(len(rules) + 1, len(rules2))
511
512     def test_remove_rule_by_id(self):
513         """
514         Tests the creation of an OpenStack Security Group with two simple
515         custom rules then removes one by the rule ID.
516         """
517         # Create Security Group
518         sec_grp_rule_settings = list()
519         sec_grp_rule_settings.append(
520             SecurityGroupRuleConfig(
521                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
522                 description='test_rule_1'))
523         sec_grp_rule_settings.append(
524             SecurityGroupRuleConfig(
525                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
526                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
527                 description='test_rule_2'))
528         sec_grp_rule_settings.append(
529             SecurityGroupRuleConfig(
530                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
531                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
532                 port_range_min=10, port_range_max=20,
533                 description='test_rule_3'))
534         sec_grp_settings = SecurityGroupConfig(
535             name=self.sec_grp_name, description='hello group',
536             rule_settings=sec_grp_rule_settings)
537         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
538             self.os_creds, sec_grp_settings)
539         self.sec_grp_creator.create()
540
541         sec_grp = neutron_utils.get_security_group(
542             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
543         validation_utils.objects_equivalent(
544             self.sec_grp_creator.get_security_group(), sec_grp)
545         rules = neutron_utils.get_rules_by_security_group(
546             self.neutron, self.sec_grp_creator.get_security_group())
547         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
548         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
549                                             rules)
550
551         self.assertTrue(
552             validate_sec_grp(
553                 self.neutron, self.keystone,
554                 self.sec_grp_creator.sec_grp_settings,
555                 self.sec_grp_creator.get_security_group(), rules))
556
557         self.sec_grp_creator.remove_rule(
558             rule_id=rules[0].id)
559         rules_after_del = neutron_utils.get_rules_by_security_group(
560             self.neutron,
561             self.sec_grp_creator.get_security_group())
562         self.assertEqual(len(rules) - 1, len(rules_after_del))
563
564     def test_remove_rule_by_setting(self):
565         """
566         Tests the creation of an OpenStack Security Group with two simple
567         custom rules then removes one by the rule setting object
568         """
569         # Create Security Group
570         sec_grp_rule_settings = list()
571         sec_grp_rule_settings.append(
572             SecurityGroupRuleConfig(
573                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
574                 description='test_rule_1'))
575         sec_grp_rule_settings.append(
576             SecurityGroupRuleConfig(
577                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
578                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
579                 description='test_rule_2'))
580         sec_grp_rule_settings.append(
581             SecurityGroupRuleConfig(
582                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
583                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
584                 port_range_min=10, port_range_max=20,
585                 description='test_rule_3'))
586         sec_grp_settings = SecurityGroupConfig(
587             name=self.sec_grp_name, description='hello group',
588             rule_settings=sec_grp_rule_settings)
589         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
590             self.os_creds, sec_grp_settings)
591         self.sec_grp_creator.create()
592
593         sec_grp = neutron_utils.get_security_group(
594             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
595         validation_utils.objects_equivalent(
596             self.sec_grp_creator.get_security_group(), sec_grp)
597
598         rules = neutron_utils.get_rules_by_security_group(
599             self.neutron, self.sec_grp_creator.get_security_group())
600         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
601         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
602                                             rules)
603
604         self.assertTrue(
605             validate_sec_grp(
606                 self.neutron, self.keystone,
607                 self.sec_grp_creator.sec_grp_settings,
608                 self.sec_grp_creator.get_security_group(), rules))
609
610         self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
611         rules_after_del = neutron_utils.get_rules_by_security_group(
612             self.neutron,
613             self.sec_grp_creator.get_security_group())
614         self.assertEqual(len(rules) - 1, len(rules_after_del))
615
616
617 def validate_sec_grp(neutron, keystone, sec_grp_settings, sec_grp,
618                      rules=list()):
619     """
620     Returns True is the settings on a security group are properly contained
621     on the SNAPS SecurityGroup domain object
622     :param neutron: the neutron client
623     :param keystone: the keystone client
624     :param sec_grp_settings: the security group configuration
625     :param sec_grp: the SNAPS-OO security group object
626     :param rules: collection of SNAPS-OO security group rule objects
627     :return: T/F
628     """
629     return (sec_grp.description == sec_grp_settings.description and
630             sec_grp.name == sec_grp_settings.name and
631             validate_sec_grp_rules(
632                 neutron, keystone, sec_grp_settings.rule_settings, rules))
633
634
635 def validate_sec_grp_rules(neutron, keystone, rule_settings, rules):
636     """
637     Returns True is the settings on a security group rule are properly
638     contained on the SNAPS SecurityGroupRule domain object.
639     This function will only operate on rules that contain a description as
640     this is the only means to tell if the rule is custom or defaulted by
641     OpenStack
642     :param neutron: the neutron client
643     :param keystone: the keystone client
644     :param rule_settings: collection of SecurityGroupRuleConfig objects
645     :param rules: a collection of SecurityGroupRule domain objects
646     :return: T/F
647     """
648
649     for rule_setting in rule_settings:
650         if rule_setting.description:
651             match = False
652             for rule in rules:
653                 sec_grp = neutron_utils.get_security_group(
654                     neutron, keystone, sec_grp_name=rule_setting.sec_grp_name)
655
656                 setting_eth_type = create_security_group.Ethertype.IPv4
657                 if rule_setting.ethertype:
658                     setting_eth_type = rule_setting.ethertype
659
660                 if not sec_grp:
661                     return False
662
663                 proto_str = 'null'
664                 if rule.protocol:
665                     proto_str = rule.protocol
666
667                 if (rule.description == rule_setting.description and
668                     rule.direction == rule_setting.direction.name and
669                     rule.ethertype == setting_eth_type.name and
670                     rule.port_range_max == rule_setting.port_range_max and
671                     rule.port_range_min == rule_setting.port_range_min and
672                     proto_str == str(rule_setting.protocol.value) and
673                     rule.remote_group_id == rule_setting.remote_group_id and
674                     rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
675                         rule.security_group_id == sec_grp.id):
676                     match = True
677                     break
678
679             if not match:
680                 return False
681
682     return True
683
684
685 class CreateMultipleSecurityGroupTests(OSIntegrationTestCase):
686     """
687     Test for the CreateSecurityGroup class and how it interacts with security
688     groups within other projects with the same name
689     """
690
691     def setUp(self):
692         """
693         Instantiates the CreateSecurityGroup object that is responsible for
694         downloading and creating an OS image file within OpenStack
695         """
696         super(self.__class__, self).__start__()
697
698         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
699         self.sec_grp_name = guid + 'name'
700         self.neutron = neutron_utils.neutron_client(self.os_creds)
701
702         # Initialize for cleanup
703         self.admin_sec_grp_config = SecurityGroupConfig(
704             name=self.sec_grp_name, description='hello group')
705         self.sec_grp_creator_admin = OpenStackSecurityGroup(
706             self.admin_os_creds, self.admin_sec_grp_config)
707         self.sec_grp_creator_admin.create()
708         self.sec_grp_creator_proj = None
709
710     def tearDown(self):
711         """
712         Cleans the image and downloaded image file
713         """
714         if self.sec_grp_creator_admin:
715             self.sec_grp_creator_admin.clean()
716         if self.sec_grp_creator_proj:
717             self.sec_grp_creator_proj.clean()
718
719         super(self.__class__, self).__clean__()
720
721     def test_sec_grp_same_name_diff_proj(self):
722         """
723         Tests the creation of an OpenStack Security Group with the same name
724         within a different project/tenant.
725         """
726         # Create Security Group
727         sec_grp_config = SecurityGroupConfig(
728             name=self.sec_grp_name, description='hello group')
729         self.sec_grp_creator_proj = OpenStackSecurityGroup(
730             self.os_creds, sec_grp_config)
731         self.sec_grp_creator_proj.create()
732
733         self.assertNotEqual(
734             self.sec_grp_creator_admin.get_security_group().id,
735             self.sec_grp_creator_proj.get_security_group().id)
736
737         admin_sec_grp_creator = OpenStackSecurityGroup(
738             self.admin_os_creds, self.admin_sec_grp_config)
739         admin_sec_grp_creator.create()
740         self.assertEqual(self.sec_grp_creator_admin.get_security_group().id,
741                          admin_sec_grp_creator.get_security_group().id)
742
743         proj_sec_grp_creator = OpenStackSecurityGroup(
744             self.os_creds, sec_grp_config)
745         proj_sec_grp_creator.create()
746         self.assertEqual(self.sec_grp_creator_proj.get_security_group().id,
747                          proj_sec_grp_creator.get_security_group().id)