Merge "Changes to enable overriding the OSCreds for tests."
[snaps.git] / snaps / openstack / tests / create_security_group_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import unittest
16 import uuid
17
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import (
20     SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
21     Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError)
22 from snaps.openstack.tests import validation_utils
23 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
24 from snaps.openstack.utils import neutron_utils
25
26 __author__ = 'spisarski'
27
28
29 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
30     """
31     Tests the construction of the SecurityGroupRuleSettings class
32     """
33
34     def test_no_params(self):
35         with self.assertRaises(SecurityGroupRuleSettingsError):
36             SecurityGroupRuleSettings()
37
38     def test_empty_config(self):
39         with self.assertRaises(SecurityGroupRuleSettingsError):
40             SecurityGroupRuleSettings(**dict())
41
42     def test_name_only(self):
43         with self.assertRaises(SecurityGroupRuleSettingsError):
44             SecurityGroupRuleSettings(sec_grp_name='foo')
45
46     def test_config_with_name_only(self):
47         with self.assertRaises(SecurityGroupRuleSettingsError):
48             SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
49
50     def test_name_and_direction(self):
51         settings = SecurityGroupRuleSettings(sec_grp_name='foo',
52                                              direction=Direction.ingress)
53         self.assertEqual('foo', settings.sec_grp_name)
54         self.assertEqual(Direction.ingress, settings.direction)
55
56     def test_config_name_and_direction(self):
57         settings = SecurityGroupRuleSettings(
58             **{'sec_grp_name': 'foo', 'direction': 'ingress'})
59         self.assertEqual('foo', settings.sec_grp_name)
60         self.assertEqual(Direction.ingress, settings.direction)
61
62     def test_all(self):
63         settings = SecurityGroupRuleSettings(
64             sec_grp_name='foo', description='fubar',
65             direction=Direction.egress, remote_group_id='rgi',
66             protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
67             port_range_max=2,
68             remote_ip_prefix='prfx')
69         self.assertEqual('foo', settings.sec_grp_name)
70         self.assertEqual('fubar', settings.description)
71         self.assertEqual(Direction.egress, settings.direction)
72         self.assertEqual('rgi', settings.remote_group_id)
73         self.assertEqual(Protocol.icmp, settings.protocol)
74         self.assertEqual(Ethertype.IPv6, settings.ethertype)
75         self.assertEqual(1, settings.port_range_min)
76         self.assertEqual(2, settings.port_range_max)
77         self.assertEqual('prfx', settings.remote_ip_prefix)
78
79     def test_config_all(self):
80         settings = SecurityGroupRuleSettings(
81             **{'sec_grp_name': 'foo',
82                'description': 'fubar',
83                'direction': 'egress',
84                'remote_group_id': 'rgi',
85                'protocol': 'tcp',
86                'ethertype': 'IPv6',
87                'port_range_min': 1,
88                'port_range_max': 2,
89                'remote_ip_prefix': 'prfx'})
90         self.assertEqual('foo', settings.sec_grp_name)
91         self.assertEqual('fubar', settings.description)
92         self.assertEqual(Direction.egress, settings.direction)
93         self.assertEqual('rgi', settings.remote_group_id)
94         self.assertEqual(Protocol.tcp, settings.protocol)
95         self.assertEqual(Ethertype.IPv6, settings.ethertype)
96         self.assertEqual(1, settings.port_range_min)
97         self.assertEqual(2, settings.port_range_max)
98         self.assertEqual('prfx', settings.remote_ip_prefix)
99
100
101 class SecurityGroupSettingsUnitTests(unittest.TestCase):
102     """
103     Tests the construction of the SecurityGroupSettings class
104     """
105
106     def test_no_params(self):
107         with self.assertRaises(SecurityGroupSettingsError):
108             SecurityGroupSettings()
109
110     def test_empty_config(self):
111         with self.assertRaises(SecurityGroupSettingsError):
112             SecurityGroupSettings(**dict())
113
114     def test_name_only(self):
115         settings = SecurityGroupSettings(name='foo')
116         self.assertEqual('foo', settings.name)
117
118     def test_config_with_name_only(self):
119         settings = SecurityGroupSettings(**{'name': 'foo'})
120         self.assertEqual('foo', settings.name)
121
122     def test_invalid_rule(self):
123         rule_setting = SecurityGroupRuleSettings(
124             sec_grp_name='bar', direction=Direction.ingress,
125             description='test_rule_1')
126         with self.assertRaises(SecurityGroupSettingsError):
127             SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
128
129     def test_all(self):
130         rule_settings = list()
131         rule_settings.append(SecurityGroupRuleSettings(
132             sec_grp_name='bar', direction=Direction.egress,
133             description='test_rule_1'))
134         rule_settings.append(SecurityGroupRuleSettings(
135             sec_grp_name='bar', direction=Direction.ingress,
136             description='test_rule_2'))
137         settings = SecurityGroupSettings(
138             name='bar', description='fubar', project_name='foo',
139             rule_settings=rule_settings)
140
141         self.assertEqual('bar', settings.name)
142         self.assertEqual('fubar', settings.description)
143         self.assertEqual('foo', settings.project_name)
144         self.assertEqual(rule_settings[0], settings.rule_settings[0])
145         self.assertEqual(rule_settings[1], settings.rule_settings[1])
146
147     def test_config_all(self):
148         settings = SecurityGroupSettings(
149             **{'name': 'bar',
150                'description': 'fubar',
151                'project_name': 'foo',
152                'rules': [
153                    {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
154
155         self.assertEqual('bar', settings.name)
156         self.assertEqual('fubar', settings.description)
157         self.assertEqual('foo', settings.project_name)
158         self.assertEqual(1, len(settings.rule_settings))
159         self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
160         self.assertEqual(Direction.ingress,
161                          settings.rule_settings[0].direction)
162
163
164 class CreateSecurityGroupTests(OSIntegrationTestCase):
165     """
166     Test for the CreateSecurityGroup class defined in create_security_group.py
167     """
168
169     def setUp(self):
170         """
171         Instantiates the CreateSecurityGroup object that is responsible for
172         downloading and creating an OS image file within OpenStack
173         """
174         super(self.__class__, self).__start__()
175
176         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
177         self.sec_grp_name = guid + 'name'
178         self.neutron = neutron_utils.neutron_client(self.os_creds)
179
180         # Initialize for cleanup
181         self.sec_grp_creator = None
182
183     def tearDown(self):
184         """
185         Cleans the image and downloaded image file
186         """
187         if self.sec_grp_creator:
188             self.sec_grp_creator.clean()
189
190         super(self.__class__, self).__clean__()
191
192     def test_create_group_without_rules(self):
193         """
194         Tests the creation of an OpenStack Security Group without custom rules.
195         """
196         # Create Image
197         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
198                                                  description='hello group')
199         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
200             self.os_creds, sec_grp_settings)
201         self.sec_grp_creator.create()
202
203         sec_grp = neutron_utils.get_security_group(self.neutron,
204                                                    self.sec_grp_name)
205         self.assertIsNotNone(sec_grp)
206
207         validation_utils.objects_equivalent(
208             self.sec_grp_creator.get_security_group(), sec_grp)
209         rules = neutron_utils.get_rules_by_security_group(
210             self.neutron, self.sec_grp_creator.get_security_group())
211         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
212         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
213                                             rules)
214
215         self.assertTrue(
216             validate_sec_grp(
217                 self.neutron, self.sec_grp_creator.sec_grp_settings,
218                 self.sec_grp_creator.get_security_group()))
219
220     def test_create_group_admin_user_to_new_project(self):
221         """
222         Tests the creation of an OpenStack Security Group without custom rules.
223         """
224         # Create Image
225         sec_grp_settings = SecurityGroupSettings(
226             name=self.sec_grp_name, description='hello group',
227             project_name=self.admin_os_creds.project_name)
228         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
229             self.os_creds, sec_grp_settings)
230         self.sec_grp_creator.create()
231
232         sec_grp = neutron_utils.get_security_group(self.neutron,
233                                                    self.sec_grp_name)
234         self.assertIsNotNone(sec_grp)
235
236         validation_utils.objects_equivalent(
237             self.sec_grp_creator.get_security_group(), sec_grp)
238         rules = neutron_utils.get_rules_by_security_group(
239             self.neutron, self.sec_grp_creator.get_security_group())
240         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
241         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
242                                             rules)
243
244         self.assertTrue(
245             validate_sec_grp(
246                 self.neutron, self.sec_grp_creator.sec_grp_settings,
247                 self.sec_grp_creator.get_security_group(), rules))
248
249     def test_create_group_new_user_to_admin_project(self):
250         """
251         Tests the creation of an OpenStack Security Group without custom rules.
252         """
253         # Create Image
254         sec_grp_settings = SecurityGroupSettings(
255             name=self.sec_grp_name, description='hello group',
256             project_name=self.os_creds.project_name)
257         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
258             self.admin_os_creds, sec_grp_settings)
259         self.sec_grp_creator.create()
260
261         sec_grp = neutron_utils.get_security_group(self.neutron,
262                                                    self.sec_grp_name)
263         self.assertIsNotNone(sec_grp)
264
265         validation_utils.objects_equivalent(
266             self.sec_grp_creator.get_security_group(), sec_grp)
267         rules = neutron_utils.get_rules_by_security_group(
268             self.neutron, self.sec_grp_creator.get_security_group())
269         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
270         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
271                                             rules)
272
273         self.assertTrue(
274             validate_sec_grp(
275                 self.neutron, self.sec_grp_creator.sec_grp_settings,
276                 self.sec_grp_creator.get_security_group(), rules))
277
278     def test_create_delete_group(self):
279         """
280         Tests the creation of an OpenStack Security Group without custom rules.
281         """
282         # Create Image
283         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
284                                                  description='hello group')
285         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
286             self.os_creds, sec_grp_settings)
287         created_sec_grp = self.sec_grp_creator.create()
288         self.assertIsNotNone(created_sec_grp)
289
290         self.assertTrue(
291             validate_sec_grp(
292                 self.neutron, self.sec_grp_creator.sec_grp_settings,
293                 self.sec_grp_creator.get_security_group()))
294
295         neutron_utils.delete_security_group(self.neutron, created_sec_grp)
296         self.assertIsNone(neutron_utils.get_security_group(
297             self.neutron, self.sec_grp_creator.sec_grp_settings.name))
298
299         self.sec_grp_creator.clean()
300
301     def test_create_group_with_one_simple_rule(self):
302         """
303         Tests the creation of an OpenStack Security Group with one simple
304         custom rule.
305         """
306         # Create Image
307         sec_grp_rule_settings = list()
308         sec_grp_rule_settings.append(
309             SecurityGroupRuleSettings(
310                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
311                 description='test_rule_1'))
312         sec_grp_settings = SecurityGroupSettings(
313             name=self.sec_grp_name, description='hello group',
314             rule_settings=sec_grp_rule_settings)
315         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
316             self.os_creds, sec_grp_settings)
317         self.sec_grp_creator.create()
318
319         sec_grp = neutron_utils.get_security_group(self.neutron,
320                                                    self.sec_grp_name)
321         validation_utils.objects_equivalent(
322             self.sec_grp_creator.get_security_group(), sec_grp)
323         rules = neutron_utils.get_rules_by_security_group(
324             self.neutron, self.sec_grp_creator.get_security_group())
325         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
326         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
327                                             rules)
328
329         self.assertTrue(
330             validate_sec_grp(
331                 self.neutron, self.sec_grp_creator.sec_grp_settings,
332                 self.sec_grp_creator.get_security_group(), rules))
333
334     def test_create_group_with_one_complex_rule(self):
335         """
336         Tests the creation of an OpenStack Security Group with one simple
337         custom rule.
338         """
339         # Create Image
340         sec_grp_rule_settings = list()
341         sec_grp_rule_settings.append(
342             SecurityGroupRuleSettings(
343                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
344                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
345                 port_range_min=10, port_range_max=20,
346                 description='test_rule_1'))
347         sec_grp_settings = SecurityGroupSettings(
348             name=self.sec_grp_name, description='hello group',
349             rule_settings=sec_grp_rule_settings)
350         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
351             self.os_creds, sec_grp_settings)
352         self.sec_grp_creator.create()
353
354         sec_grp = neutron_utils.get_security_group(self.neutron,
355                                                    self.sec_grp_name)
356         validation_utils.objects_equivalent(
357             self.sec_grp_creator.get_security_group(), sec_grp)
358         rules = neutron_utils.get_rules_by_security_group(
359             self.neutron, self.sec_grp_creator.get_security_group())
360         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
361         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
362                                             rules)
363
364         self.assertTrue(
365             validate_sec_grp(
366                 self.neutron, self.sec_grp_creator.sec_grp_settings,
367                 self.sec_grp_creator.get_security_group(), rules))
368
369     def test_create_group_with_several_rules(self):
370         """
371         Tests the creation of an OpenStack Security Group with one simple
372         custom rule.
373         """
374         # Create Image
375         sec_grp_rule_settings = list()
376         sec_grp_rule_settings.append(
377             SecurityGroupRuleSettings(
378                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
379                 description='test_rule_1'))
380         sec_grp_rule_settings.append(
381             SecurityGroupRuleSettings(
382                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
383                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
384                 description='test_rule_2'))
385         sec_grp_rule_settings.append(
386             SecurityGroupRuleSettings(
387                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
388                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
389                 port_range_min=10, port_range_max=20,
390                 description='test_rule_3'))
391         sec_grp_settings = SecurityGroupSettings(
392             name=self.sec_grp_name, description='hello group',
393             rule_settings=sec_grp_rule_settings)
394         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
395             self.os_creds, sec_grp_settings)
396         self.sec_grp_creator.create()
397
398         sec_grp = neutron_utils.get_security_group(self.neutron,
399                                                    self.sec_grp_name)
400         validation_utils.objects_equivalent(
401             self.sec_grp_creator.get_security_group(), sec_grp)
402         rules = neutron_utils.get_rules_by_security_group(
403             self.neutron, self.sec_grp_creator.get_security_group())
404         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
405         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
406                                             rules)
407
408         self.assertTrue(
409             validate_sec_grp(
410                 self.neutron, self.sec_grp_creator.sec_grp_settings,
411                 self.sec_grp_creator.get_security_group(), rules))
412
413     def test_add_rule(self):
414         """
415         Tests the creation of an OpenStack Security Group with one simple
416         custom rule then adds one after creation.
417         """
418         # Create Image
419         sec_grp_rule_settings = list()
420         sec_grp_rule_settings.append(
421             SecurityGroupRuleSettings(
422                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
423                 description='test_rule_1'))
424         sec_grp_settings = SecurityGroupSettings(
425             name=self.sec_grp_name, description='hello group',
426             rule_settings=sec_grp_rule_settings)
427         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
428             self.os_creds, sec_grp_settings)
429         self.sec_grp_creator.create()
430
431         sec_grp = neutron_utils.get_security_group(self.neutron,
432                                                    self.sec_grp_name)
433         validation_utils.objects_equivalent(
434             self.sec_grp_creator.get_security_group(), sec_grp)
435
436         rules = neutron_utils.get_rules_by_security_group(
437             self.neutron, self.sec_grp_creator.get_security_group())
438
439         self.assertTrue(
440             validate_sec_grp(
441                 self.neutron, self.sec_grp_creator.sec_grp_settings,
442                 self.sec_grp_creator.get_security_group(), rules))
443
444         rules = neutron_utils.get_rules_by_security_group(
445             self.neutron, self.sec_grp_creator.get_security_group())
446         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
447         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
448                                             rules)
449
450         self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
451             sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
452             direction=Direction.egress, protocol=Protocol.icmp,
453             description='test_rule_2'))
454         rules2 = neutron_utils.get_rules_by_security_group(
455             self.neutron, self.sec_grp_creator.get_security_group())
456         self.assertEqual(len(rules) + 1, len(rules2))
457
458     def test_remove_rule_by_id(self):
459         """
460         Tests the creation of an OpenStack Security Group with two simple
461         custom rules then removes one by the rule ID.
462         """
463         # Create Image
464         sec_grp_rule_settings = list()
465         sec_grp_rule_settings.append(
466             SecurityGroupRuleSettings(
467                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
468                 description='test_rule_1'))
469         sec_grp_rule_settings.append(
470             SecurityGroupRuleSettings(
471                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
472                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
473                 description='test_rule_2'))
474         sec_grp_rule_settings.append(
475             SecurityGroupRuleSettings(
476                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
477                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
478                 port_range_min=10, port_range_max=20,
479                 description='test_rule_3'))
480         sec_grp_settings = SecurityGroupSettings(
481             name=self.sec_grp_name, description='hello group',
482             rule_settings=sec_grp_rule_settings)
483         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
484             self.os_creds, sec_grp_settings)
485         self.sec_grp_creator.create()
486
487         sec_grp = neutron_utils.get_security_group(self.neutron,
488                                                    self.sec_grp_name)
489         validation_utils.objects_equivalent(
490             self.sec_grp_creator.get_security_group(), sec_grp)
491         rules = neutron_utils.get_rules_by_security_group(
492             self.neutron, self.sec_grp_creator.get_security_group())
493         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
494         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
495                                             rules)
496
497         self.assertTrue(
498             validate_sec_grp(
499                 self.neutron, self.sec_grp_creator.sec_grp_settings,
500                 self.sec_grp_creator.get_security_group(), rules))
501
502         self.sec_grp_creator.remove_rule(
503             rule_id=rules[0].id)
504         rules_after_del = neutron_utils.get_rules_by_security_group(
505             self.neutron,
506             self.sec_grp_creator.get_security_group())
507         self.assertEqual(len(rules) - 1, len(rules_after_del))
508
509     def test_remove_rule_by_setting(self):
510         """
511         Tests the creation of an OpenStack Security Group with two simple
512         custom rules then removes one by the rule setting object
513         """
514         # Create Image
515         sec_grp_rule_settings = list()
516         sec_grp_rule_settings.append(
517             SecurityGroupRuleSettings(
518                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
519                 description='test_rule_1'))
520         sec_grp_rule_settings.append(
521             SecurityGroupRuleSettings(
522                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
523                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
524                 description='test_rule_2'))
525         sec_grp_rule_settings.append(
526             SecurityGroupRuleSettings(
527                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
528                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
529                 port_range_min=10, port_range_max=20,
530                 description='test_rule_3'))
531         sec_grp_settings = SecurityGroupSettings(
532             name=self.sec_grp_name, description='hello group',
533             rule_settings=sec_grp_rule_settings)
534         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
535             self.os_creds, sec_grp_settings)
536         self.sec_grp_creator.create()
537
538         sec_grp = neutron_utils.get_security_group(self.neutron,
539                                                    self.sec_grp_name)
540         validation_utils.objects_equivalent(
541             self.sec_grp_creator.get_security_group(), sec_grp)
542
543         rules = neutron_utils.get_rules_by_security_group(
544             self.neutron, self.sec_grp_creator.get_security_group())
545         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
546         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
547                                             rules)
548
549         self.assertTrue(
550             validate_sec_grp(
551                 self.neutron, self.sec_grp_creator.sec_grp_settings,
552                 self.sec_grp_creator.get_security_group(), rules))
553
554         self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
555         rules_after_del = neutron_utils.get_rules_by_security_group(
556             self.neutron,
557             self.sec_grp_creator.get_security_group())
558         self.assertEqual(len(rules) - 1, len(rules_after_del))
559
560
561 def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
562     """
563     Returns True is the settings on a security group are properly contained
564     on the SNAPS SecurityGroup domain object
565     :param neutron: the neutron client
566     :param sec_grp_settings: the security group configuration
567     :param sec_grp: the SNAPS-OO security group object
568     :param rules: collection of SNAPS-OO security group rule objects
569     :return: T/F
570     """
571     return (sec_grp.description == sec_grp_settings.description and
572             sec_grp.name == sec_grp_settings.name and
573             validate_sec_grp_rules(
574                 neutron, sec_grp_settings.rule_settings, rules))
575
576
577 def validate_sec_grp_rules(neutron, rule_settings, rules):
578     """
579     Returns True is the settings on a security group rule are properly
580     contained on the SNAPS SecurityGroupRule domain object.
581     This function will only operate on rules that contain a description as
582     this is the only means to tell if the rule is custom or defaulted by
583     OpenStack
584     :param neutron: the neutron client
585     :param rule_settings: collection of SecurityGroupRuleSettings objects
586     :param rules: a collection of SecurityGroupRule domain objects
587     :return: T/F
588     """
589
590     for rule_setting in rule_settings:
591         if rule_setting.description:
592             match = False
593             for rule in rules:
594                 if rule_setting.protocol == Protocol.null:
595                     setting_proto = None
596                 else:
597                     setting_proto = rule_setting.protocol.name
598
599                 sec_grp = neutron_utils.get_security_group(
600                     neutron, rule_setting.sec_grp_name)
601
602                 setting_eth_type = create_security_group.Ethertype.IPv4
603                 if rule_setting.ethertype:
604                     setting_eth_type = rule_setting.ethertype
605
606                 if not sec_grp:
607                     return False
608
609                 if (rule.description == rule_setting.description and
610                     rule.direction == rule_setting.direction.name and
611                     rule.ethertype == setting_eth_type.name and
612                     rule.port_range_max == rule_setting.port_range_max and
613                     rule.port_range_min == rule_setting.port_range_min and
614                     rule.protocol == setting_proto and
615                     rule.remote_group_id == rule_setting.remote_group_id and
616                     rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
617                     rule.security_group_id == sec_grp.id):
618                     match = True
619                     break
620
621             if not match:
622                 return False
623
624     return True