Refactoring of SecurityGroupSettings to extend SecurityGroupConfig
[snaps.git] / snaps / openstack / tests / create_security_group_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import unittest
16 import uuid
17
18 from snaps.config.security_group import (
19     SecurityGroupConfig,  SecurityGroupRuleConfig,
20     SecurityGroupRuleConfigError, SecurityGroupConfigError)
21 from snaps.openstack import create_security_group
22 from snaps.openstack.create_security_group import (
23     SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
24     Protocol)
25 from snaps.openstack.tests import validation_utils
26 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
27 from snaps.openstack.utils import neutron_utils
28
29 __author__ = 'spisarski'
30
31
32 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
33     """
34     Tests the construction of the SecurityGroupRuleSettings class
35     """
36
37     def test_no_params(self):
38         with self.assertRaises(SecurityGroupRuleConfigError):
39             SecurityGroupRuleSettings()
40
41     def test_empty_config(self):
42         with self.assertRaises(SecurityGroupRuleConfigError):
43             SecurityGroupRuleSettings(**dict())
44
45     def test_name_only(self):
46         with self.assertRaises(SecurityGroupRuleConfigError):
47             SecurityGroupRuleSettings(sec_grp_name='foo')
48
49     def test_config_with_name_only(self):
50         with self.assertRaises(SecurityGroupRuleConfigError):
51             SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
52
53     def test_name_and_direction(self):
54         settings = SecurityGroupRuleSettings(sec_grp_name='foo',
55                                              direction=Direction.ingress)
56         self.assertEqual('foo', settings.sec_grp_name)
57         self.assertEqual(Direction.ingress.value, settings.direction.value)
58
59     def test_config_name_and_direction(self):
60         settings = SecurityGroupRuleSettings(
61             **{'sec_grp_name': 'foo', 'direction': 'ingress'})
62         self.assertEqual('foo', settings.sec_grp_name)
63         self.assertEqual(Direction.ingress.value, settings.direction.value)
64
65     def test_proto_ah_str(self):
66         settings = SecurityGroupRuleSettings(
67             **{'sec_grp_name': 'foo', 'direction': 'ingress',
68                'protocol': 'ah'})
69         self.assertEqual('foo', settings.sec_grp_name)
70         self.assertEqual(Direction.ingress.value, settings.direction.value)
71         self.assertEqual(Protocol.ah.value, settings.protocol.value)
72
73     def test_proto_ah_value(self):
74         settings = SecurityGroupRuleSettings(
75             **{'sec_grp_name': 'foo', 'direction': 'ingress',
76                'protocol': 51})
77         self.assertEqual('foo', settings.sec_grp_name)
78         self.assertEqual(Direction.ingress.value, settings.direction.value)
79         self.assertEqual(Protocol.ah.value, settings.protocol.value)
80
81     def test_proto_any(self):
82         settings = SecurityGroupRuleSettings(
83             **{'sec_grp_name': 'foo', 'direction': 'ingress',
84                'protocol': 'any'})
85         self.assertEqual('foo', settings.sec_grp_name)
86         self.assertEqual(Direction.ingress.value, settings.direction.value)
87         self.assertEqual(Protocol.null.value, settings.protocol.value)
88
89     def test_proto_null(self):
90         settings = SecurityGroupRuleSettings(
91             **{'sec_grp_name': 'foo', 'direction': 'ingress',
92                'protocol': 'null'})
93         self.assertEqual('foo', settings.sec_grp_name)
94         self.assertEqual(Direction.ingress.value, settings.direction.value)
95         self.assertEqual(Protocol.null.value, settings.protocol.value)
96
97     def test_all(self):
98         settings = SecurityGroupRuleSettings(
99             sec_grp_name='foo', description='fubar',
100             direction=Direction.egress, remote_group_id='rgi',
101             protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
102             port_range_max=2,
103             remote_ip_prefix='prfx')
104         self.assertEqual('foo', settings.sec_grp_name)
105         self.assertEqual('fubar', settings.description)
106         self.assertEqual(Direction.egress.value, settings.direction.value)
107         self.assertEqual('rgi', settings.remote_group_id)
108         self.assertEqual(Protocol.icmp.value, settings.protocol.value)
109         self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
110         self.assertEqual(1, settings.port_range_min)
111         self.assertEqual(2, settings.port_range_max)
112         self.assertEqual('prfx', settings.remote_ip_prefix)
113
114     def test_config_all(self):
115         settings = SecurityGroupRuleSettings(
116             **{'sec_grp_name': 'foo',
117                'description': 'fubar',
118                'direction': 'egress',
119                'remote_group_id': 'rgi',
120                'protocol': 'tcp',
121                'ethertype': 'IPv6',
122                'port_range_min': 1,
123                'port_range_max': 2,
124                'remote_ip_prefix': 'prfx'})
125         self.assertEqual('foo', settings.sec_grp_name)
126         self.assertEqual('fubar', settings.description)
127         self.assertEqual(Direction.egress.value, settings.direction.value)
128         self.assertEqual('rgi', settings.remote_group_id)
129         self.assertEqual(Protocol.tcp.value, settings.protocol.value)
130         self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
131         self.assertEqual(1, settings.port_range_min)
132         self.assertEqual(2, settings.port_range_max)
133         self.assertEqual('prfx', settings.remote_ip_prefix)
134
135
136 class SecurityGroupSettingsUnitTests(unittest.TestCase):
137     """
138     Tests the construction of the SecurityGroupSettings class
139     """
140
141     def test_no_params(self):
142         with self.assertRaises(SecurityGroupConfigError):
143             SecurityGroupSettings()
144
145     def test_empty_config(self):
146         with self.assertRaises(SecurityGroupConfigError):
147             SecurityGroupSettings(**dict())
148
149     def test_name_only(self):
150         settings = SecurityGroupSettings(name='foo')
151         self.assertEqual('foo', settings.name)
152
153     def test_config_with_name_only(self):
154         settings = SecurityGroupSettings(**{'name': 'foo'})
155         self.assertEqual('foo', settings.name)
156
157     def test_invalid_rule(self):
158         rule_setting = SecurityGroupRuleSettings(
159             sec_grp_name='bar', direction=Direction.ingress,
160             description='test_rule_1')
161         with self.assertRaises(SecurityGroupConfigError):
162             SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
163
164     def test_all(self):
165         rule_settings = list()
166         rule_settings.append(SecurityGroupRuleSettings(
167             sec_grp_name='bar', direction=Direction.egress,
168             description='test_rule_1'))
169         rule_settings.append(SecurityGroupRuleSettings(
170             sec_grp_name='bar', direction=Direction.ingress,
171             description='test_rule_2'))
172         settings = SecurityGroupSettings(
173             name='bar', description='fubar', project_name='foo',
174             rule_settings=rule_settings)
175
176         self.assertEqual('bar', settings.name)
177         self.assertEqual('fubar', settings.description)
178         self.assertEqual('foo', settings.project_name)
179         self.assertEqual(rule_settings[0], settings.rule_settings[0])
180         self.assertEqual(rule_settings[1], settings.rule_settings[1])
181
182     def test_config_all(self):
183         settings = SecurityGroupSettings(
184             **{'name': 'bar',
185                'description': 'fubar',
186                'project_name': 'foo',
187                'rules': [
188                    {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
189
190         self.assertEqual('bar', settings.name)
191         self.assertEqual('fubar', settings.description)
192         self.assertEqual('foo', settings.project_name)
193         self.assertEqual(1, len(settings.rule_settings))
194         self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
195         self.assertEqual(Direction.ingress.value,
196                          settings.rule_settings[0].direction.value)
197
198
199 class CreateSecurityGroupTests(OSIntegrationTestCase):
200     """
201     Test for the CreateSecurityGroup class defined in create_security_group.py
202     """
203
204     def setUp(self):
205         """
206         Instantiates the CreateSecurityGroup object that is responsible for
207         downloading and creating an OS image file within OpenStack
208         """
209         super(self.__class__, self).__start__()
210
211         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
212         self.sec_grp_name = guid + 'name'
213         self.neutron = neutron_utils.neutron_client(self.os_creds)
214
215         # Initialize for cleanup
216         self.sec_grp_creator = None
217
218     def tearDown(self):
219         """
220         Cleans the image and downloaded image file
221         """
222         if self.sec_grp_creator:
223             self.sec_grp_creator.clean()
224
225         super(self.__class__, self).__clean__()
226
227     def test_create_group_without_rules(self):
228         """
229         Tests the creation of an OpenStack Security Group without custom rules.
230         """
231         # Create Image
232         sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
233                                                description='hello group')
234         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
235             self.os_creds, sec_grp_settings)
236         self.sec_grp_creator.create()
237
238         sec_grp = neutron_utils.get_security_group(
239             self.neutron, sec_grp_settings=sec_grp_settings)
240         self.assertIsNotNone(sec_grp)
241
242         validation_utils.objects_equivalent(
243             self.sec_grp_creator.get_security_group(), sec_grp)
244         rules = neutron_utils.get_rules_by_security_group(
245             self.neutron, self.sec_grp_creator.get_security_group())
246         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
247         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
248                                             rules)
249
250         self.assertTrue(
251             validate_sec_grp(
252                 self.neutron, self.sec_grp_creator.sec_grp_settings,
253                 self.sec_grp_creator.get_security_group()))
254
255     def test_create_group_admin_user_to_new_project(self):
256         """
257         Tests the creation of an OpenStack Security Group without custom rules.
258         """
259         # Create Image
260         sec_grp_settings = SecurityGroupConfig(
261             name=self.sec_grp_name, description='hello group',
262             project_name=self.admin_os_creds.project_name)
263         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
264             self.os_creds, sec_grp_settings)
265         self.sec_grp_creator.create()
266
267         sec_grp = neutron_utils.get_security_group(
268             self.neutron, sec_grp_settings=sec_grp_settings)
269         self.assertIsNotNone(sec_grp)
270
271         validation_utils.objects_equivalent(
272             self.sec_grp_creator.get_security_group(), sec_grp)
273         rules = neutron_utils.get_rules_by_security_group(
274             self.neutron, self.sec_grp_creator.get_security_group())
275         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
276         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
277                                             rules)
278
279         self.assertTrue(
280             validate_sec_grp(
281                 self.neutron, self.sec_grp_creator.sec_grp_settings,
282                 self.sec_grp_creator.get_security_group(), rules))
283
284     def test_create_group_new_user_to_admin_project(self):
285         """
286         Tests the creation of an OpenStack Security Group without custom rules.
287         """
288         # Create Image
289         sec_grp_settings = SecurityGroupConfig(
290             name=self.sec_grp_name, description='hello group',
291             project_name=self.os_creds.project_name)
292         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
293             self.admin_os_creds, sec_grp_settings)
294         self.sec_grp_creator.create()
295
296         sec_grp = neutron_utils.get_security_group(
297             self.neutron, sec_grp_settings=sec_grp_settings)
298         self.assertIsNotNone(sec_grp)
299
300         validation_utils.objects_equivalent(
301             self.sec_grp_creator.get_security_group(), sec_grp)
302         rules = neutron_utils.get_rules_by_security_group(
303             self.neutron, self.sec_grp_creator.get_security_group())
304         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
305         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
306                                             rules)
307
308         self.assertTrue(
309             validate_sec_grp(
310                 self.neutron, self.sec_grp_creator.sec_grp_settings,
311                 self.sec_grp_creator.get_security_group(), rules))
312
313     def test_create_delete_group(self):
314         """
315         Tests the creation of an OpenStack Security Group without custom rules.
316         """
317         # Create Image
318         sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
319                                                description='hello group')
320         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
321             self.os_creds, sec_grp_settings)
322         created_sec_grp = self.sec_grp_creator.create()
323         self.assertIsNotNone(created_sec_grp)
324
325         self.assertTrue(
326             validate_sec_grp(
327                 self.neutron, self.sec_grp_creator.sec_grp_settings,
328                 self.sec_grp_creator.get_security_group()))
329
330         neutron_utils.delete_security_group(self.neutron, created_sec_grp)
331         self.assertIsNone(neutron_utils.get_security_group(
332             self.neutron,
333             sec_grp_settings=self.sec_grp_creator.sec_grp_settings))
334
335         self.sec_grp_creator.clean()
336
337     def test_create_group_with_one_simple_rule(self):
338         """
339         Tests the creation of an OpenStack Security Group with one simple
340         custom rule.
341         """
342         # Create Image
343         sec_grp_rule_settings = list()
344         sec_grp_rule_settings.append(
345             SecurityGroupRuleConfig(
346                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
347                 description='test_rule_1'))
348         sec_grp_settings = SecurityGroupConfig(
349             name=self.sec_grp_name, description='hello group',
350             rule_settings=sec_grp_rule_settings)
351         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
352             self.os_creds, sec_grp_settings)
353         self.sec_grp_creator.create()
354
355         sec_grp = neutron_utils.get_security_group(
356             self.neutron, sec_grp_settings=sec_grp_settings)
357         validation_utils.objects_equivalent(
358             self.sec_grp_creator.get_security_group(), sec_grp)
359         rules = neutron_utils.get_rules_by_security_group(
360             self.neutron, self.sec_grp_creator.get_security_group())
361         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
362         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
363                                             rules)
364
365         self.assertTrue(
366             validate_sec_grp(
367                 self.neutron, self.sec_grp_creator.sec_grp_settings,
368                 self.sec_grp_creator.get_security_group(), rules))
369
370     def test_create_group_with_one_complex_rule(self):
371         """
372         Tests the creation of an OpenStack Security Group with one simple
373         custom rule.
374         """
375         # Create Image
376         sec_grp_rule_settings = list()
377         sec_grp_rule_settings.append(
378             SecurityGroupRuleConfig(
379                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
380                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
381                 port_range_min=10, port_range_max=20,
382                 description='test_rule_1'))
383         sec_grp_settings = SecurityGroupConfig(
384             name=self.sec_grp_name, description='hello group',
385             rule_settings=sec_grp_rule_settings)
386         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
387             self.os_creds, sec_grp_settings)
388         self.sec_grp_creator.create()
389
390         sec_grp = neutron_utils.get_security_group(
391             self.neutron, sec_grp_settings=sec_grp_settings)
392         validation_utils.objects_equivalent(
393             self.sec_grp_creator.get_security_group(), sec_grp)
394         rules = neutron_utils.get_rules_by_security_group(
395             self.neutron, self.sec_grp_creator.get_security_group())
396         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
397         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
398                                             rules)
399
400         self.assertTrue(
401             validate_sec_grp(
402                 self.neutron, self.sec_grp_creator.sec_grp_settings,
403                 self.sec_grp_creator.get_security_group(), rules))
404
405     def test_create_group_with_several_rules(self):
406         """
407         Tests the creation of an OpenStack Security Group with one simple
408         custom rule.
409         """
410         # Create Image
411         sec_grp_rule_settings = list()
412         sec_grp_rule_settings.append(
413             SecurityGroupRuleConfig(
414                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
415                 description='test_rule_1'))
416         sec_grp_rule_settings.append(
417             SecurityGroupRuleConfig(
418                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
419                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
420                 description='test_rule_2'))
421         sec_grp_rule_settings.append(
422             SecurityGroupRuleConfig(
423                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
424                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
425                 port_range_min=10, port_range_max=20,
426                 description='test_rule_3'))
427         sec_grp_settings = SecurityGroupConfig(
428             name=self.sec_grp_name, description='hello group',
429             rule_settings=sec_grp_rule_settings)
430         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
431             self.os_creds, sec_grp_settings)
432         self.sec_grp_creator.create()
433
434         sec_grp = neutron_utils.get_security_group(
435             self.neutron, sec_grp_settings=sec_grp_settings)
436         validation_utils.objects_equivalent(
437             self.sec_grp_creator.get_security_group(), sec_grp)
438         rules = neutron_utils.get_rules_by_security_group(
439             self.neutron, self.sec_grp_creator.get_security_group())
440         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
441         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
442                                             rules)
443
444         self.assertTrue(
445             validate_sec_grp(
446                 self.neutron, self.sec_grp_creator.sec_grp_settings,
447                 self.sec_grp_creator.get_security_group(), rules))
448
449     def test_add_rule(self):
450         """
451         Tests the creation of an OpenStack Security Group with one simple
452         custom rule then adds one after creation.
453         """
454         # Create Image
455         sec_grp_rule_settings = list()
456         sec_grp_rule_settings.append(
457             SecurityGroupRuleConfig(
458                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
459                 description='test_rule_1'))
460         sec_grp_settings = SecurityGroupConfig(
461             name=self.sec_grp_name, description='hello group',
462             rule_settings=sec_grp_rule_settings)
463         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
464             self.os_creds, sec_grp_settings)
465         self.sec_grp_creator.create()
466
467         sec_grp = neutron_utils.get_security_group(
468             self.neutron, sec_grp_settings=sec_grp_settings)
469         validation_utils.objects_equivalent(
470             self.sec_grp_creator.get_security_group(), sec_grp)
471
472         rules = neutron_utils.get_rules_by_security_group(
473             self.neutron, self.sec_grp_creator.get_security_group())
474
475         self.assertTrue(
476             validate_sec_grp(
477                 self.neutron, self.sec_grp_creator.sec_grp_settings,
478                 self.sec_grp_creator.get_security_group(), rules))
479
480         rules = neutron_utils.get_rules_by_security_group(
481             self.neutron, self.sec_grp_creator.get_security_group())
482         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
483         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
484                                             rules)
485
486         self.sec_grp_creator.add_rule(SecurityGroupRuleConfig(
487             sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
488             direction=Direction.egress, protocol=Protocol.icmp,
489             description='test_rule_2'))
490         rules2 = neutron_utils.get_rules_by_security_group(
491             self.neutron, self.sec_grp_creator.get_security_group())
492         self.assertEqual(len(rules) + 1, len(rules2))
493
494     def test_remove_rule_by_id(self):
495         """
496         Tests the creation of an OpenStack Security Group with two simple
497         custom rules then removes one by the rule ID.
498         """
499         # Create Image
500         sec_grp_rule_settings = list()
501         sec_grp_rule_settings.append(
502             SecurityGroupRuleConfig(
503                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
504                 description='test_rule_1'))
505         sec_grp_rule_settings.append(
506             SecurityGroupRuleConfig(
507                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
508                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
509                 description='test_rule_2'))
510         sec_grp_rule_settings.append(
511             SecurityGroupRuleConfig(
512                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
513                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
514                 port_range_min=10, port_range_max=20,
515                 description='test_rule_3'))
516         sec_grp_settings = SecurityGroupConfig(
517             name=self.sec_grp_name, description='hello group',
518             rule_settings=sec_grp_rule_settings)
519         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
520             self.os_creds, sec_grp_settings)
521         self.sec_grp_creator.create()
522
523         sec_grp = neutron_utils.get_security_group(
524             self.neutron, sec_grp_settings=sec_grp_settings)
525         validation_utils.objects_equivalent(
526             self.sec_grp_creator.get_security_group(), sec_grp)
527         rules = neutron_utils.get_rules_by_security_group(
528             self.neutron, self.sec_grp_creator.get_security_group())
529         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
530         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
531                                             rules)
532
533         self.assertTrue(
534             validate_sec_grp(
535                 self.neutron, self.sec_grp_creator.sec_grp_settings,
536                 self.sec_grp_creator.get_security_group(), rules))
537
538         self.sec_grp_creator.remove_rule(
539             rule_id=rules[0].id)
540         rules_after_del = neutron_utils.get_rules_by_security_group(
541             self.neutron,
542             self.sec_grp_creator.get_security_group())
543         self.assertEqual(len(rules) - 1, len(rules_after_del))
544
545     def test_remove_rule_by_setting(self):
546         """
547         Tests the creation of an OpenStack Security Group with two simple
548         custom rules then removes one by the rule setting object
549         """
550         # Create Image
551         sec_grp_rule_settings = list()
552         sec_grp_rule_settings.append(
553             SecurityGroupRuleConfig(
554                 sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
555                 description='test_rule_1'))
556         sec_grp_rule_settings.append(
557             SecurityGroupRuleConfig(
558                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
559                 protocol=Protocol.udp, ethertype=Ethertype.IPv6,
560                 description='test_rule_2'))
561         sec_grp_rule_settings.append(
562             SecurityGroupRuleConfig(
563                 sec_grp_name=self.sec_grp_name, direction=Direction.egress,
564                 protocol=Protocol.udp, ethertype=Ethertype.IPv4,
565                 port_range_min=10, port_range_max=20,
566                 description='test_rule_3'))
567         sec_grp_settings = SecurityGroupConfig(
568             name=self.sec_grp_name, description='hello group',
569             rule_settings=sec_grp_rule_settings)
570         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
571             self.os_creds, sec_grp_settings)
572         self.sec_grp_creator.create()
573
574         sec_grp = neutron_utils.get_security_group(
575             self.neutron, sec_grp_settings=sec_grp_settings)
576         validation_utils.objects_equivalent(
577             self.sec_grp_creator.get_security_group(), sec_grp)
578
579         rules = neutron_utils.get_rules_by_security_group(
580             self.neutron, self.sec_grp_creator.get_security_group())
581         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
582         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
583                                             rules)
584
585         self.assertTrue(
586             validate_sec_grp(
587                 self.neutron, self.sec_grp_creator.sec_grp_settings,
588                 self.sec_grp_creator.get_security_group(), rules))
589
590         self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
591         rules_after_del = neutron_utils.get_rules_by_security_group(
592             self.neutron,
593             self.sec_grp_creator.get_security_group())
594         self.assertEqual(len(rules) - 1, len(rules_after_del))
595
596
597 def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
598     """
599     Returns True is the settings on a security group are properly contained
600     on the SNAPS SecurityGroup domain object
601     :param neutron: the neutron client
602     :param sec_grp_settings: the security group configuration
603     :param sec_grp: the SNAPS-OO security group object
604     :param rules: collection of SNAPS-OO security group rule objects
605     :return: T/F
606     """
607     return (sec_grp.description == sec_grp_settings.description and
608             sec_grp.name == sec_grp_settings.name and
609             validate_sec_grp_rules(
610                 neutron, sec_grp_settings.rule_settings, rules))
611
612
613 def validate_sec_grp_rules(neutron, rule_settings, rules):
614     """
615     Returns True is the settings on a security group rule are properly
616     contained on the SNAPS SecurityGroupRule domain object.
617     This function will only operate on rules that contain a description as
618     this is the only means to tell if the rule is custom or defaulted by
619     OpenStack
620     :param neutron: the neutron client
621     :param rule_settings: collection of SecurityGroupRuleConfig objects
622     :param rules: a collection of SecurityGroupRule domain objects
623     :return: T/F
624     """
625
626     for rule_setting in rule_settings:
627         if rule_setting.description:
628             match = False
629             for rule in rules:
630                 sec_grp = neutron_utils.get_security_group(
631                     neutron, sec_grp_name=rule_setting.sec_grp_name)
632
633                 setting_eth_type = create_security_group.Ethertype.IPv4
634                 if rule_setting.ethertype:
635                     setting_eth_type = rule_setting.ethertype
636
637                 if not sec_grp:
638                     return False
639
640                 proto_str = 'null'
641                 if rule.protocol:
642                     proto_str = rule.protocol
643
644                 if (rule.description == rule_setting.description and
645                     rule.direction == rule_setting.direction.name and
646                     rule.ethertype == setting_eth_type.name and
647                     rule.port_range_max == rule_setting.port_range_max and
648                     rule.port_range_min == rule_setting.port_range_min and
649                     proto_str == str(rule_setting.protocol.value) and
650                     rule.remote_group_id == rule_setting.remote_group_id and
651                     rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
652                         rule.security_group_id == sec_grp.id):
653                     match = True
654                     break
655
656             if not match:
657                 return False
658
659     return True