Merge "Adding INFO and LICENSE for review"
[snaps.git] / snaps / openstack / tests / create_security_group_tests.py
1 # Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16 import unittest
17
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction, \
20     Ethertype, Protocol
21 from snaps.openstack.tests import validation_utils
22 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
23 from snaps.openstack.utils import neutron_utils
24
25 __author__ = 'spisarski'
26
27
28 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
29     """
30     Tests the construction of the SecurityGroupRuleSettings class
31     """
32
33     def test_no_params(self):
34         with self.assertRaises(Exception):
35             SecurityGroupRuleSettings()
36
37     def test_empty_config(self):
38         with self.assertRaises(Exception):
39             SecurityGroupRuleSettings(config=dict())
40
41     def test_name_only(self):
42         with self.assertRaises(Exception):
43             SecurityGroupRuleSettings(sec_grp_name='foo')
44
45     def test_config_with_name_only(self):
46         with self.assertRaises(Exception):
47             SecurityGroupRuleSettings(config={'sec_grp_name': 'foo'})
48
49     def test_name_and_direction(self):
50         settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress)
51         self.assertEquals('foo', settings.sec_grp_name)
52         self.assertEquals(Direction.ingress, settings.direction)
53
54     def test_config_name_and_direction(self):
55         settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'})
56         self.assertEquals('foo', settings.sec_grp_name)
57         self.assertEquals(Direction.ingress, settings.direction)
58
59     def test_all(self):
60         settings = SecurityGroupRuleSettings(
61             sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi',
62             protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2,
63             remote_ip_prefix='prfx')
64         self.assertEquals('foo', settings.sec_grp_name)
65         self.assertEquals('fubar', settings.description)
66         self.assertEquals(Direction.egress, settings.direction)
67         self.assertEquals('rgi', settings.remote_group_id)
68         self.assertEquals(Protocol.icmp, settings.protocol)
69         self.assertEquals(Ethertype.IPv6, settings.ethertype)
70         self.assertEquals(1, settings.port_range_min)
71         self.assertEquals(2, settings.port_range_max)
72         self.assertEquals('prfx', settings.remote_ip_prefix)
73
74     def test_config_all(self):
75         settings = SecurityGroupRuleSettings(
76             config={'sec_grp_name': 'foo',
77                     'description': 'fubar',
78                     'direction': 'egress',
79                     'remote_group_id': 'rgi',
80                     'protocol': 'tcp',
81                     'ethertype': 'IPv6',
82                     'port_range_min': 1,
83                     'port_range_max': 2,
84                     'remote_ip_prefix': 'prfx'})
85         self.assertEquals('foo', settings.sec_grp_name)
86         self.assertEquals('fubar', settings.description)
87         self.assertEquals(Direction.egress, settings.direction)
88         self.assertEquals('rgi', settings.remote_group_id)
89         self.assertEquals(Protocol.tcp, settings.protocol)
90         self.assertEquals(Ethertype.IPv6, settings.ethertype)
91         self.assertEquals(1, settings.port_range_min)
92         self.assertEquals(2, settings.port_range_max)
93         self.assertEquals('prfx', settings.remote_ip_prefix)
94
95
96 class SecurityGroupSettingsUnitTests(unittest.TestCase):
97     """
98     Tests the construction of the SecurityGroupSettings class
99     """
100
101     def test_no_params(self):
102         with self.assertRaises(Exception):
103             SecurityGroupSettings()
104
105     def test_empty_config(self):
106         with self.assertRaises(Exception):
107             SecurityGroupSettings(config=dict())
108
109     def test_name_only(self):
110         settings = SecurityGroupSettings(name='foo')
111         self.assertEquals('foo', settings.name)
112
113     def test_config_with_name_only(self):
114         settings = SecurityGroupSettings(config={'name': 'foo'})
115         self.assertEquals('foo', settings.name)
116
117     def test_invalid_rule(self):
118         rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress)
119         with self.assertRaises(Exception):
120             SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
121
122     def test_all(self):
123         rule_settings = list()
124         rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.egress))
125         rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress))
126         settings = SecurityGroupSettings(
127             name='bar', description='fubar', project_name='foo', rule_settings=rule_settings)
128
129         self.assertEquals('bar', settings.name)
130         self.assertEquals('fubar', settings.description)
131         self.assertEquals('foo', settings.project_name)
132         self.assertEquals(rule_settings[0], settings.rule_settings[0])
133         self.assertEquals(rule_settings[1], settings.rule_settings[1])
134
135     def test_config_all(self):
136         settings = SecurityGroupSettings(
137             config={'name': 'bar',
138                     'description': 'fubar',
139                     'project_name': 'foo',
140                     'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]})
141
142         self.assertEquals('bar', settings.name)
143         self.assertEquals('fubar', settings.description)
144         self.assertEquals('foo', settings.project_name)
145         self.assertEquals(1, len(settings.rule_settings))
146         self.assertEquals('bar', settings.rule_settings[0].sec_grp_name)
147         self.assertEquals(Direction.ingress, settings.rule_settings[0].direction)
148
149
150 class CreateSecurityGroupTests(OSIntegrationTestCase):
151     """
152     Test for the CreateSecurityGroup class defined in create_security_group.py
153     """
154
155     def setUp(self):
156         """
157         Instantiates the CreateSecurityGroup object that is responsible for downloading and creating an OS image file
158         within OpenStack
159         """
160         super(self.__class__, self).__start__()
161
162         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
163         self.sec_grp_name = guid + 'name'
164         self.neutron = neutron_utils.neutron_client(self.os_creds)
165
166         # Initialize for cleanup
167         self.sec_grp_creator = None
168
169     def tearDown(self):
170         """
171         Cleans the image and downloaded image file
172         """
173         if self.sec_grp_creator:
174             self.sec_grp_creator.clean()
175
176         super(self.__class__, self).__clean__()
177
178     def test_create_group_without_rules(self):
179         """
180         Tests the creation of an OpenStack Security Group without custom rules.
181         """
182         # Create Image
183         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
184         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
185         self.sec_grp_creator.create()
186
187         sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
188         self.assertIsNotNone(sec_grp)
189
190         validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
191         rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
192         self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
193         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
194
195     def test_create_delete_group(self):
196         """
197         Tests the creation of an OpenStack Security Group without custom rules.
198         """
199         # Create Image
200         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
201         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
202         created_sec_grp = self.sec_grp_creator.create()
203         self.assertIsNotNone(created_sec_grp)
204
205         neutron_utils.delete_security_group(self.neutron, created_sec_grp)
206         self.assertIsNone(neutron_utils.get_security_group(self.neutron, self.sec_grp_creator.sec_grp_settings.name))
207
208         self.sec_grp_creator.clean()
209
210     def test_create_group_with_one_simple_rule(self):
211         """
212         Tests the creation of an OpenStack Security Group with one simple custom rule.
213         """
214         # Create Image
215         sec_grp_rule_settings = list()
216         sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
217                                                                direction=Direction.ingress))
218         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
219                                                  rule_settings=sec_grp_rule_settings)
220         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
221         self.sec_grp_creator.create()
222
223         sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
224         validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
225         rules = neutron_utils.get_rules_by_security_group(self.neutron,
226                                                           self.sec_grp_creator.get_security_group())
227         self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
228         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
229
230     def test_create_group_with_several_rules(self):
231         """
232         Tests the creation of an OpenStack Security Group with one simple custom rule.
233         """
234         # Create Image
235         sec_grp_rule_settings = list()
236         sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
237                                                                direction=Direction.ingress))
238         sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
239                                                                direction=Direction.egress,
240                                                                protocol=Protocol.udp,
241                                                                ethertype=Ethertype.IPv6))
242         sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
243                                                                direction=Direction.egress,
244                                                                protocol=Protocol.udp,
245                                                                ethertype=Ethertype.IPv4,
246                                                                port_range_min=10,
247                                                                port_range_max=20))
248         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
249                                                  rule_settings=sec_grp_rule_settings)
250         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
251         self.sec_grp_creator.create()
252
253         sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
254         validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
255         rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
256         self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
257         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
258
259     def test_add_rule(self):
260         """
261         Tests the creation of an OpenStack Security Group with one simple custom rule then adds one after creation.
262         """
263         # Create Image
264         sec_grp_rule_settings = list()
265         sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
266                                                                direction=Direction.ingress))
267         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
268                                                  rule_settings=sec_grp_rule_settings)
269         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
270         self.sec_grp_creator.create()
271
272         sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
273         validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
274         rules = neutron_utils.get_rules_by_security_group(self.neutron,
275                                                           self.sec_grp_creator.get_security_group())
276         self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
277         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
278
279         self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
280                                                                 direction=Direction.egress, protocol=Protocol.icmp))
281         rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
282         self.assertEquals(len(rules) + 1, len(rules2))
283
284     def test_remove_rule_by_id(self):
285         """
286         Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule ID.
287         """
288         # Create Image
289         sec_grp_rule_settings = list()
290         sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
291                                                                direction=Direction.ingress))
292         sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
293                                                                direction=Direction.egress,
294                                                                protocol=Protocol.udp,
295                                                                ethertype=Ethertype.IPv6))
296         sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
297                                                                direction=Direction.egress,
298                                                                protocol=Protocol.udp,
299                                                                ethertype=Ethertype.IPv4,
300                                                                port_range_min=10,
301                                                                port_range_max=20))
302         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
303                                                  rule_settings=sec_grp_rule_settings)
304         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
305         self.sec_grp_creator.create()
306
307         sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
308         validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
309         rules = neutron_utils.get_rules_by_security_group(self.neutron,
310                                                           self.sec_grp_creator.get_security_group())
311         self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
312         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
313
314         self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id'])
315         rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
316                                                                     self.sec_grp_creator.get_security_group())
317         self.assertEquals(len(rules) - 1, len(rules_after_del))
318
319     def test_remove_rule_by_setting(self):
320         """
321         Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule
322         setting object
323         """
324         # Create Image
325         sec_grp_rule_settings = list()
326         sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
327                                                                direction=Direction.ingress))
328         sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
329                                                                direction=Direction.egress,
330                                                                protocol=Protocol.udp,
331                                                                ethertype=Ethertype.IPv6))
332         sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
333                                                                direction=Direction.egress,
334                                                                protocol=Protocol.udp,
335                                                                ethertype=Ethertype.IPv4,
336                                                                port_range_min=10,
337                                                                port_range_max=20))
338         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
339                                                  rule_settings=sec_grp_rule_settings)
340         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
341         self.sec_grp_creator.create()
342
343         sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
344         validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
345         rules = neutron_utils.get_rules_by_security_group(self.neutron,
346                                                           self.sec_grp_creator.get_security_group())
347         self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
348         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
349
350         self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
351         rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
352                                                                     self.sec_grp_creator.get_security_group())
353         self.assertEquals(len(rules) - 1, len(rules_after_del))
354
355 # TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex