Supporting the protocol string value of 'any' for security group rules.
[snaps.git] / snaps / openstack / tests / create_security_group_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import unittest
16 import uuid
17
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import (
20     SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
21     Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError)
22 from snaps.openstack.tests import validation_utils
23 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
24 from snaps.openstack.utils import neutron_utils
25
26 __author__ = 'spisarski'
27
28
29 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
30     """
31     Tests the construction of the SecurityGroupRuleSettings class
32     """
33
34     def test_no_params(self):
35         with self.assertRaises(SecurityGroupRuleSettingsError):
36             SecurityGroupRuleSettings()
37
38     def test_empty_config(self):
39         with self.assertRaises(SecurityGroupRuleSettingsError):
40             SecurityGroupRuleSettings(**dict())
41
42     def test_name_only(self):
43         with self.assertRaises(SecurityGroupRuleSettingsError):
44             SecurityGroupRuleSettings(sec_grp_name='foo')
45
46     def test_config_with_name_only(self):
47         with self.assertRaises(SecurityGroupRuleSettingsError):
48             SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
49
50     def test_name_and_direction(self):
51         settings = SecurityGroupRuleSettings(sec_grp_name='foo',
52                                              direction=Direction.ingress)
53         self.assertEqual('foo', settings.sec_grp_name)
54         self.assertEqual(Direction.ingress, settings.direction)
55
56     def test_config_name_and_direction(self):
57         settings = SecurityGroupRuleSettings(
58             **{'sec_grp_name': 'foo', 'direction': 'ingress'})
59         self.assertEqual('foo', settings.sec_grp_name)
60         self.assertEqual(Direction.ingress, settings.direction)
61
62     def test_proto_ah_str(self):
63         settings = SecurityGroupRuleSettings(
64             **{'sec_grp_name': 'foo', 'direction': 'ingress',
65                'protocol': 'ah'})
66         self.assertEqual('foo', settings.sec_grp_name)
67         self.assertEqual(Direction.ingress, settings.direction)
68         self.assertEqual(Protocol.ah, settings.protocol)
69
70     def test_proto_ah_value(self):
71         settings = SecurityGroupRuleSettings(
72             **{'sec_grp_name': 'foo', 'direction': 'ingress',
73                'protocol': 51})
74         self.assertEqual('foo', settings.sec_grp_name)
75         self.assertEqual(Direction.ingress, settings.direction)
76         self.assertEqual(Protocol.ah, settings.protocol)
77
78     def test_proto_any(self):
79         settings = SecurityGroupRuleSettings(
80             **{'sec_grp_name': 'foo', 'direction': 'ingress',
81                'protocol': 'any'})
82         self.assertEqual('foo', settings.sec_grp_name)
83         self.assertEqual(Direction.ingress, settings.direction)
84         self.assertEqual(Protocol.null, settings.protocol)
85
86     def test_proto_null(self):
87         settings = SecurityGroupRuleSettings(
88             **{'sec_grp_name': 'foo', 'direction': 'ingress',
89                'protocol': 'null'})
90         self.assertEqual('foo', settings.sec_grp_name)
91         self.assertEqual(Direction.ingress, settings.direction)
92         self.assertEqual(Protocol.null, settings.protocol)
93
94     def test_all(self):
95         settings = SecurityGroupRuleSettings(
96             sec_grp_name='foo', description='fubar',
97             direction=Direction.egress, remote_group_id='rgi',
98             protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
99             port_range_max=2,
100             remote_ip_prefix='prfx')
101         self.assertEqual('foo', settings.sec_grp_name)
102         self.assertEqual('fubar', settings.description)
103         self.assertEqual(Direction.egress, settings.direction)
104         self.assertEqual('rgi', settings.remote_group_id)
105         self.assertEqual(Protocol.icmp, settings.protocol)
106         self.assertEqual(Ethertype.IPv6, settings.ethertype)
107         self.assertEqual(1, settings.port_range_min)
108         self.assertEqual(2, settings.port_range_max)
109         self.assertEqual('prfx', settings.remote_ip_prefix)
110
111     def test_config_all(self):
112         settings = SecurityGroupRuleSettings(
113             **{'sec_grp_name': 'foo',
114                'description': 'fubar',
115                'direction': 'egress',
116                'remote_group_id': 'rgi',
117                'protocol': 'tcp',
118                'ethertype': 'IPv6',
119                'port_range_min': 1,
120                'port_range_max': 2,
121                'remote_ip_prefix': 'prfx'})
122         self.assertEqual('foo', settings.sec_grp_name)
123         self.assertEqual('fubar', settings.description)
124         self.assertEqual(Direction.egress, settings.direction)
125         self.assertEqual('rgi', settings.remote_group_id)
126         self.assertEqual(Protocol.tcp, settings.protocol)
127         self.assertEqual(Ethertype.IPv6, settings.ethertype)
128         self.assertEqual(1, settings.port_range_min)
129         self.assertEqual(2, settings.port_range_max)
130         self.assertEqual('prfx', settings.remote_ip_prefix)
131
132
133 class SecurityGroupSettingsUnitTests(unittest.TestCase):
134     """
135     Tests the construction of the SecurityGroupSettings class
136     """
137
138     def test_no_params(self):
139         with self.assertRaises(SecurityGroupSettingsError):
140             SecurityGroupSettings()
141
142     def test_empty_config(self):
143         with self.assertRaises(SecurityGroupSettingsError):
144             SecurityGroupSettings(**dict())
145
146     def test_name_only(self):
147         settings = SecurityGroupSettings(name='foo')
148         self.assertEqual('foo', settings.name)
149
150     def test_config_with_name_only(self):
151         settings = SecurityGroupSettings(**{'name': 'foo'})
152         self.assertEqual('foo', settings.name)
153
154     def test_invalid_rule(self):
155         rule_setting = SecurityGroupRuleSettings(
156             sec_grp_name='bar', direction=Direction.ingress,
157             description='test_rule_1')
158         with self.assertRaises(SecurityGroupSettingsError):
159             SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
160
161     def test_all(self):
162         rule_settings = list()
163         rule_settings.append(SecurityGroupRuleSettings(
164             sec_grp_name='bar', direction=Direction.egress,
165             description='test_rule_1'))
166         rule_settings.append(SecurityGroupRuleSettings(
167             sec_grp_name='bar', direction=Direction.ingress,
168             description='test_rule_2'))
169         settings = SecurityGroupSettings(
170             name='bar', description='fubar', project_name='foo',
171             rule_settings=rule_settings)
172
173         self.assertEqual('bar', settings.name)
174         self.assertEqual('fubar', settings.description)
175         self.assertEqual('foo', settings.project_name)
176         self.assertEqual(rule_settings[0], settings.rule_settings[0])
177         self.assertEqual(rule_settings[1], settings.rule_settings[1])
178
179     def test_config_all(self):
180         settings = SecurityGroupSettings(
181             **{'name': 'bar',
182                'description': 'fubar',
183                'project_name': 'foo',
184                'rules': [
185                    {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
186
187         self.assertEqual('bar', settings.name)
188         self.assertEqual('fubar', settings.description)
189         self.assertEqual('foo', settings.project_name)
190         self.assertEqual(1, len(settings.rule_settings))
191         self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
192         self.assertEqual(Direction.ingress,
193                          settings.rule_settings[0].direction)
194
195
196 class CreateSecurityGroupTests(OSIntegrationTestCase):
197     """
198     Test for the CreateSecurityGroup class defined in create_security_group.py
199     """
200
201     def setUp(self):
202         """
203         Instantiates the CreateSecurityGroup object that is responsible for
204         downloading and creating an OS image file within OpenStack
205         """
206         super(self.__class__, self).__start__()
207
208         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
209         self.sec_grp_name = guid + 'name'
210         self.neutron = neutron_utils.neutron_client(self.os_creds)
211
212         # Initialize for cleanup
213         self.sec_grp_creator = None
214
215     def tearDown(self):
216         """
217         Cleans the image and downloaded image file
218         """
219         if self.sec_grp_creator:
220             self.sec_grp_creator.clean()
221
222         super(self.__class__, self).__clean__()
223
224     def test_create_group_without_rules(self):
225         """
226         Tests the creation of an OpenStack Security Group without custom rules.
227         """
228         # Create Image
229         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
230                                                  description='hello group')
231         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
232             self.os_creds, sec_grp_settings)
233         self.sec_grp_creator.create()
234
235         sec_grp = neutron_utils.get_security_group(
236             self.neutron, sec_grp_settings=sec_grp_settings)
237         self.assertIsNotNone(sec_grp)
238
239         validation_utils.objects_equivalent(
240             self.sec_grp_creator.get_security_group(), sec_grp)
241         rules = neutron_utils.get_rules_by_security_group(
242             self.neutron, self.sec_grp_creator.get_security_group())
243         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
244         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
245                                             rules)
246
247         self.assertTrue(
248             validate_sec_grp(
249                 self.neutron, self.sec_grp_creator.sec_grp_settings,
250                 self.sec_grp_creator.get_security_group()))
251
252     def test_create_group_admin_user_to_new_project(self):
253         """
254         Tests the creation of an OpenStack Security Group without custom rules.
255         """
256         # Create Image
257         sec_grp_settings = SecurityGroupSettings(
258             name=self.sec_grp_name, description='hello group',
259             project_name=self.admin_os_creds.project_name)
260         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
261             self.os_creds, sec_grp_settings)
262         self.sec_grp_creator.create()
263
264         sec_grp = neutron_utils.get_security_group(
265             self.neutron, sec_grp_settings=sec_grp_settings)
266         self.assertIsNotNone(sec_grp)
267
268         validation_utils.objects_equivalent(
269             self.sec_grp_creator.get_security_group(), sec_grp)
270         rules = neutron_utils.get_rules_by_security_group(
271             self.neutron, self.sec_grp_creator.get_security_group())
272         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
273         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
274                                             rules)
275
276         self.assertTrue(
277             validate_sec_grp(
278                 self.neutron, self.sec_grp_creator.sec_grp_settings,
279                 self.sec_grp_creator.get_security_group(), rules))
280
281     def test_create_group_new_user_to_admin_project(self):
282         """
283         Tests the creation of an OpenStack Security Group without custom rules.
284         """
285         # Create Image
286         sec_grp_settings = SecurityGroupSettings(
287             name=self.sec_grp_name, description='hello group',
288             project_name=self.os_creds.project_name)
289         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
290             self.admin_os_creds, sec_grp_settings)
291         self.sec_grp_creator.create()
292
293         sec_grp = neutron_utils.get_security_group(
294             self.neutron, sec_grp_settings=sec_grp_settings)
295         self.assertIsNotNone(sec_grp)
296
297         validation_utils.objects_equivalent(
298             self.sec_grp_creator.get_security_group(), sec_grp)
299         rules = neutron_utils.get_rules_by_security_group(
300             self.neutron, self.sec_grp_creator.get_security_group())
301         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
302         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
303                                             rules)
304
305         self.assertTrue(
306             validate_sec_grp(
307                 self.neutron, self.sec_grp_creator.sec_grp_settings,
308                 self.sec_grp_creator.get_security_group(), rules))
309
310     def test_create_delete_group(self):
311         """
312         Tests the creation of an OpenStack Security Group without custom rules.
313         """
314         # Create Image
315         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
316                                                  description='hello group')
317         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
318             self.os_creds, sec_grp_settings)
319         created_sec_grp = self.sec_grp_creator.create()
320         self.assertIsNotNone(created_sec_grp)
321
322         self.assertTrue(
323             validate_sec_grp(
324                 self.neutron, self.sec_grp_creator.sec_grp_settings,
325                 self.sec_grp_creator.get_security_group()))
326
327         neutron_utils.delete_security_group(self.neutron, created_sec_grp)
328         self.assertIsNone(neutron_utils.get_security_group(
329             self.neutron,
330             sec_grp_settings=self.sec_grp_creator.sec_grp_settings))
331
332         self.sec_grp_creator.clean()
333
334     def test_create_group_with_one_simple_rule(self):
335         """
336         Tests the creation of an OpenStack Security Group with one simple
337         custom rule.
338         """
339         # Create Image
340         sec_grp_rule_settings = list()
341         sec_grp_rule_settings.append(
342             SecurityGroupRuleSettings(
343                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
344                 description='test_rule_1'))
345         sec_grp_settings = SecurityGroupSettings(
346             name=self.sec_grp_name, description='hello group',
347             rule_settings=sec_grp_rule_settings)
348         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
349             self.os_creds, sec_grp_settings)
350         self.sec_grp_creator.create()
351
352         sec_grp = neutron_utils.get_security_group(
353             self.neutron, sec_grp_settings=sec_grp_settings)
354         validation_utils.objects_equivalent(
355             self.sec_grp_creator.get_security_group(), sec_grp)
356         rules = neutron_utils.get_rules_by_security_group(
357             self.neutron, self.sec_grp_creator.get_security_group())
358         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
359         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
360                                             rules)
361
362         self.assertTrue(
363             validate_sec_grp(
364                 self.neutron, self.sec_grp_creator.sec_grp_settings,
365                 self.sec_grp_creator.get_security_group(), rules))
366
367     def test_create_group_with_one_complex_rule(self):
368         """
369         Tests the creation of an OpenStack Security Group with one simple
370         custom rule.
371         """
372         # Create Image
373         sec_grp_rule_settings = list()
374         sec_grp_rule_settings.append(
375             SecurityGroupRuleSettings(
376                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
377                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
378                 port_range_min=10, port_range_max=20,
379                 description='test_rule_1'))
380         sec_grp_settings = SecurityGroupSettings(
381             name=self.sec_grp_name, description='hello group',
382             rule_settings=sec_grp_rule_settings)
383         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
384             self.os_creds, sec_grp_settings)
385         self.sec_grp_creator.create()
386
387         sec_grp = neutron_utils.get_security_group(
388             self.neutron, sec_grp_settings=sec_grp_settings)
389         validation_utils.objects_equivalent(
390             self.sec_grp_creator.get_security_group(), sec_grp)
391         rules = neutron_utils.get_rules_by_security_group(
392             self.neutron, self.sec_grp_creator.get_security_group())
393         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
394         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
395                                             rules)
396
397         self.assertTrue(
398             validate_sec_grp(
399                 self.neutron, self.sec_grp_creator.sec_grp_settings,
400                 self.sec_grp_creator.get_security_group(), rules))
401
402     def test_create_group_with_several_rules(self):
403         """
404         Tests the creation of an OpenStack Security Group with one simple
405         custom rule.
406         """
407         # Create Image
408         sec_grp_rule_settings = list()
409         sec_grp_rule_settings.append(
410             SecurityGroupRuleSettings(
411                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
412                 description='test_rule_1'))
413         sec_grp_rule_settings.append(
414             SecurityGroupRuleSettings(
415                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
416                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
417                 description='test_rule_2'))
418         sec_grp_rule_settings.append(
419             SecurityGroupRuleSettings(
420                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
421                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
422                 port_range_min=10, port_range_max=20,
423                 description='test_rule_3'))
424         sec_grp_settings = SecurityGroupSettings(
425             name=self.sec_grp_name, description='hello group',
426             rule_settings=sec_grp_rule_settings)
427         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
428             self.os_creds, sec_grp_settings)
429         self.sec_grp_creator.create()
430
431         sec_grp = neutron_utils.get_security_group(
432             self.neutron, sec_grp_settings=sec_grp_settings)
433         validation_utils.objects_equivalent(
434             self.sec_grp_creator.get_security_group(), sec_grp)
435         rules = neutron_utils.get_rules_by_security_group(
436             self.neutron, self.sec_grp_creator.get_security_group())
437         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
438         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
439                                             rules)
440
441         self.assertTrue(
442             validate_sec_grp(
443                 self.neutron, self.sec_grp_creator.sec_grp_settings,
444                 self.sec_grp_creator.get_security_group(), rules))
445
446     def test_add_rule(self):
447         """
448         Tests the creation of an OpenStack Security Group with one simple
449         custom rule then adds one after creation.
450         """
451         # Create Image
452         sec_grp_rule_settings = list()
453         sec_grp_rule_settings.append(
454             SecurityGroupRuleSettings(
455                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
456                 description='test_rule_1'))
457         sec_grp_settings = SecurityGroupSettings(
458             name=self.sec_grp_name, description='hello group',
459             rule_settings=sec_grp_rule_settings)
460         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
461             self.os_creds, sec_grp_settings)
462         self.sec_grp_creator.create()
463
464         sec_grp = neutron_utils.get_security_group(
465             self.neutron, sec_grp_settings=sec_grp_settings)
466         validation_utils.objects_equivalent(
467             self.sec_grp_creator.get_security_group(), sec_grp)
468
469         rules = neutron_utils.get_rules_by_security_group(
470             self.neutron, self.sec_grp_creator.get_security_group())
471
472         self.assertTrue(
473             validate_sec_grp(
474                 self.neutron, self.sec_grp_creator.sec_grp_settings,
475                 self.sec_grp_creator.get_security_group(), rules))
476
477         rules = neutron_utils.get_rules_by_security_group(
478             self.neutron, self.sec_grp_creator.get_security_group())
479         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
480         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
481                                             rules)
482
483         self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
484             sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
485             direction=Direction.egress, protocol=Protocol.icmp,
486             description='test_rule_2'))
487         rules2 = neutron_utils.get_rules_by_security_group(
488             self.neutron, self.sec_grp_creator.get_security_group())
489         self.assertEqual(len(rules) + 1, len(rules2))
490
491     def test_remove_rule_by_id(self):
492         """
493         Tests the creation of an OpenStack Security Group with two simple
494         custom rules then removes one by the rule ID.
495         """
496         # Create Image
497         sec_grp_rule_settings = list()
498         sec_grp_rule_settings.append(
499             SecurityGroupRuleSettings(
500                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
501                 description='test_rule_1'))
502         sec_grp_rule_settings.append(
503             SecurityGroupRuleSettings(
504                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
505                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
506                 description='test_rule_2'))
507         sec_grp_rule_settings.append(
508             SecurityGroupRuleSettings(
509                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
510                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
511                 port_range_min=10, port_range_max=20,
512                 description='test_rule_3'))
513         sec_grp_settings = SecurityGroupSettings(
514             name=self.sec_grp_name, description='hello group',
515             rule_settings=sec_grp_rule_settings)
516         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
517             self.os_creds, sec_grp_settings)
518         self.sec_grp_creator.create()
519
520         sec_grp = neutron_utils.get_security_group(
521             self.neutron, sec_grp_settings=sec_grp_settings)
522         validation_utils.objects_equivalent(
523             self.sec_grp_creator.get_security_group(), sec_grp)
524         rules = neutron_utils.get_rules_by_security_group(
525             self.neutron, self.sec_grp_creator.get_security_group())
526         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
527         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
528                                             rules)
529
530         self.assertTrue(
531             validate_sec_grp(
532                 self.neutron, self.sec_grp_creator.sec_grp_settings,
533                 self.sec_grp_creator.get_security_group(), rules))
534
535         self.sec_grp_creator.remove_rule(
536             rule_id=rules[0].id)
537         rules_after_del = neutron_utils.get_rules_by_security_group(
538             self.neutron,
539             self.sec_grp_creator.get_security_group())
540         self.assertEqual(len(rules) - 1, len(rules_after_del))
541
542     def test_remove_rule_by_setting(self):
543         """
544         Tests the creation of an OpenStack Security Group with two simple
545         custom rules then removes one by the rule setting object
546         """
547         # Create Image
548         sec_grp_rule_settings = list()
549         sec_grp_rule_settings.append(
550             SecurityGroupRuleSettings(
551                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
552                 description='test_rule_1'))
553         sec_grp_rule_settings.append(
554             SecurityGroupRuleSettings(
555                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
556                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
557                 description='test_rule_2'))
558         sec_grp_rule_settings.append(
559             SecurityGroupRuleSettings(
560                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
561                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
562                 port_range_min=10, port_range_max=20,
563                 description='test_rule_3'))
564         sec_grp_settings = SecurityGroupSettings(
565             name=self.sec_grp_name, description='hello group',
566             rule_settings=sec_grp_rule_settings)
567         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
568             self.os_creds, sec_grp_settings)
569         self.sec_grp_creator.create()
570
571         sec_grp = neutron_utils.get_security_group(
572             self.neutron, sec_grp_settings=sec_grp_settings)
573         validation_utils.objects_equivalent(
574             self.sec_grp_creator.get_security_group(), sec_grp)
575
576         rules = neutron_utils.get_rules_by_security_group(
577             self.neutron, self.sec_grp_creator.get_security_group())
578         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
579         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
580                                             rules)
581
582         self.assertTrue(
583             validate_sec_grp(
584                 self.neutron, self.sec_grp_creator.sec_grp_settings,
585                 self.sec_grp_creator.get_security_group(), rules))
586
587         self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
588         rules_after_del = neutron_utils.get_rules_by_security_group(
589             self.neutron,
590             self.sec_grp_creator.get_security_group())
591         self.assertEqual(len(rules) - 1, len(rules_after_del))
592
593
594 def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
595     """
596     Returns True is the settings on a security group are properly contained
597     on the SNAPS SecurityGroup domain object
598     :param neutron: the neutron client
599     :param sec_grp_settings: the security group configuration
600     :param sec_grp: the SNAPS-OO security group object
601     :param rules: collection of SNAPS-OO security group rule objects
602     :return: T/F
603     """
604     return (sec_grp.description == sec_grp_settings.description and
605             sec_grp.name == sec_grp_settings.name and
606             validate_sec_grp_rules(
607                 neutron, sec_grp_settings.rule_settings, rules))
608
609
610 def validate_sec_grp_rules(neutron, rule_settings, rules):
611     """
612     Returns True is the settings on a security group rule are properly
613     contained on the SNAPS SecurityGroupRule domain object.
614     This function will only operate on rules that contain a description as
615     this is the only means to tell if the rule is custom or defaulted by
616     OpenStack
617     :param neutron: the neutron client
618     :param rule_settings: collection of SecurityGroupRuleSettings objects
619     :param rules: a collection of SecurityGroupRule domain objects
620     :return: T/F
621     """
622
623     for rule_setting in rule_settings:
624         if rule_setting.description:
625             match = False
626             for rule in rules:
627                 sec_grp = neutron_utils.get_security_group(
628                     neutron, sec_grp_name=rule_setting.sec_grp_name)
629
630                 setting_eth_type = create_security_group.Ethertype.IPv4
631                 if rule_setting.ethertype:
632                     setting_eth_type = rule_setting.ethertype
633
634                 if not sec_grp:
635                     return False
636
637                 proto_str = 'null'
638                 if rule.protocol:
639                     proto_str = rule.protocol
640
641                 if (rule.description == rule_setting.description and
642                     rule.direction == rule_setting.direction.name and
643                     rule.ethertype == setting_eth_type.name and
644                     rule.port_range_max == rule_setting.port_range_max and
645                     rule.port_range_min == rule_setting.port_range_min and
646                     proto_str == str(rule_setting.protocol.value) and
647                     rule.remote_group_id == rule_setting.remote_group_id and
648                     rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
649                     rule.security_group_id == sec_grp.id):
650                     match = True
651                     break
652
653             if not match:
654                 return False
655
656     return True