7cae62b6af5aba054697a4901b3e332746d779c1
[snaps.git] / snaps / openstack / tests / create_security_group_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import unittest
16 import uuid
17
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import (
20     SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
21     Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError)
22 from snaps.openstack.tests import validation_utils
23 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
24 from snaps.openstack.utils import neutron_utils
25
26 __author__ = 'spisarski'
27
28
29 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
30     """
31     Tests the construction of the SecurityGroupRuleSettings class
32     """
33
34     def test_no_params(self):
35         with self.assertRaises(SecurityGroupRuleSettingsError):
36             SecurityGroupRuleSettings()
37
38     def test_empty_config(self):
39         with self.assertRaises(SecurityGroupRuleSettingsError):
40             SecurityGroupRuleSettings(**dict())
41
42     def test_name_only(self):
43         with self.assertRaises(SecurityGroupRuleSettingsError):
44             SecurityGroupRuleSettings(sec_grp_name='foo')
45
46     def test_config_with_name_only(self):
47         with self.assertRaises(SecurityGroupRuleSettingsError):
48             SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
49
50     def test_name_and_direction(self):
51         settings = SecurityGroupRuleSettings(sec_grp_name='foo',
52                                              direction=Direction.ingress)
53         self.assertEqual('foo', settings.sec_grp_name)
54         self.assertEqual(Direction.ingress, settings.direction)
55
56     def test_config_name_and_direction(self):
57         settings = SecurityGroupRuleSettings(
58             **{'sec_grp_name': 'foo', 'direction': 'ingress'})
59         self.assertEqual('foo', settings.sec_grp_name)
60         self.assertEqual(Direction.ingress, settings.direction)
61
62     def test_all(self):
63         settings = SecurityGroupRuleSettings(
64             sec_grp_name='foo', description='fubar',
65             direction=Direction.egress, remote_group_id='rgi',
66             protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
67             port_range_max=2,
68             remote_ip_prefix='prfx')
69         self.assertEqual('foo', settings.sec_grp_name)
70         self.assertEqual('fubar', settings.description)
71         self.assertEqual(Direction.egress, settings.direction)
72         self.assertEqual('rgi', settings.remote_group_id)
73         self.assertEqual(Protocol.icmp, settings.protocol)
74         self.assertEqual(Ethertype.IPv6, settings.ethertype)
75         self.assertEqual(1, settings.port_range_min)
76         self.assertEqual(2, settings.port_range_max)
77         self.assertEqual('prfx', settings.remote_ip_prefix)
78
79     def test_config_all(self):
80         settings = SecurityGroupRuleSettings(
81             **{'sec_grp_name': 'foo',
82                'description': 'fubar',
83                'direction': 'egress',
84                'remote_group_id': 'rgi',
85                'protocol': 'tcp',
86                'ethertype': 'IPv6',
87                'port_range_min': 1,
88                'port_range_max': 2,
89                'remote_ip_prefix': 'prfx'})
90         self.assertEqual('foo', settings.sec_grp_name)
91         self.assertEqual('fubar', settings.description)
92         self.assertEqual(Direction.egress, settings.direction)
93         self.assertEqual('rgi', settings.remote_group_id)
94         self.assertEqual(Protocol.tcp, settings.protocol)
95         self.assertEqual(Ethertype.IPv6, settings.ethertype)
96         self.assertEqual(1, settings.port_range_min)
97         self.assertEqual(2, settings.port_range_max)
98         self.assertEqual('prfx', settings.remote_ip_prefix)
99
100
101 class SecurityGroupSettingsUnitTests(unittest.TestCase):
102     """
103     Tests the construction of the SecurityGroupSettings class
104     """
105
106     def test_no_params(self):
107         with self.assertRaises(SecurityGroupSettingsError):
108             SecurityGroupSettings()
109
110     def test_empty_config(self):
111         with self.assertRaises(SecurityGroupSettingsError):
112             SecurityGroupSettings(**dict())
113
114     def test_name_only(self):
115         settings = SecurityGroupSettings(name='foo')
116         self.assertEqual('foo', settings.name)
117
118     def test_config_with_name_only(self):
119         settings = SecurityGroupSettings(**{'name': 'foo'})
120         self.assertEqual('foo', settings.name)
121
122     def test_invalid_rule(self):
123         rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
124                                                  direction=Direction.ingress)
125         with self.assertRaises(SecurityGroupSettingsError):
126             SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
127
128     def test_all(self):
129         rule_settings = list()
130         rule_settings.append(SecurityGroupRuleSettings(
131             sec_grp_name='bar', direction=Direction.egress))
132         rule_settings.append(SecurityGroupRuleSettings(
133             sec_grp_name='bar', direction=Direction.ingress))
134         settings = SecurityGroupSettings(
135             name='bar', description='fubar', project_name='foo',
136             rule_settings=rule_settings)
137
138         self.assertEqual('bar', settings.name)
139         self.assertEqual('fubar', settings.description)
140         self.assertEqual('foo', settings.project_name)
141         self.assertEqual(rule_settings[0], settings.rule_settings[0])
142         self.assertEqual(rule_settings[1], settings.rule_settings[1])
143
144     def test_config_all(self):
145         settings = SecurityGroupSettings(
146             **{'name': 'bar',
147                'description': 'fubar',
148                'project_name': 'foo',
149                'rules': [
150                    {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
151
152         self.assertEqual('bar', settings.name)
153         self.assertEqual('fubar', settings.description)
154         self.assertEqual('foo', settings.project_name)
155         self.assertEqual(1, len(settings.rule_settings))
156         self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
157         self.assertEqual(Direction.ingress,
158                          settings.rule_settings[0].direction)
159
160
161 class CreateSecurityGroupTests(OSIntegrationTestCase):
162     """
163     Test for the CreateSecurityGroup class defined in create_security_group.py
164     """
165
166     def setUp(self):
167         """
168         Instantiates the CreateSecurityGroup object that is responsible for
169         downloading and creating an OS image file within OpenStack
170         """
171         super(self.__class__, self).__start__()
172
173         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
174         self.sec_grp_name = guid + 'name'
175         self.neutron = neutron_utils.neutron_client(self.os_creds)
176
177         # Initialize for cleanup
178         self.sec_grp_creator = None
179
180     def tearDown(self):
181         """
182         Cleans the image and downloaded image file
183         """
184         if self.sec_grp_creator:
185             self.sec_grp_creator.clean()
186
187         super(self.__class__, self).__clean__()
188
189     def test_create_group_without_rules(self):
190         """
191         Tests the creation of an OpenStack Security Group without custom rules.
192         """
193         # Create Image
194         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
195                                                  description='hello group')
196         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
197             self.os_creds, sec_grp_settings)
198         self.sec_grp_creator.create()
199
200         sec_grp = neutron_utils.get_security_group(self.neutron,
201                                                    self.sec_grp_name)
202         self.assertIsNotNone(sec_grp)
203
204         validation_utils.objects_equivalent(
205             self.sec_grp_creator.get_security_group(), sec_grp)
206         rules = neutron_utils.get_rules_by_security_group(
207             self.neutron, self.sec_grp_creator.get_security_group())
208         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
209         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
210                                             rules)
211
212     def test_create_group_admin_user_to_new_project(self):
213         """
214         Tests the creation of an OpenStack Security Group without custom rules.
215         """
216         # Create Image
217         sec_grp_settings = SecurityGroupSettings(
218             name=self.sec_grp_name, description='hello group',
219             project_name=self.admin_os_creds.project_name)
220         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
221             self.os_creds, sec_grp_settings)
222         self.sec_grp_creator.create()
223
224         sec_grp = neutron_utils.get_security_group(self.neutron,
225                                                    self.sec_grp_name)
226         self.assertIsNotNone(sec_grp)
227
228         validation_utils.objects_equivalent(
229             self.sec_grp_creator.get_security_group(), sec_grp)
230         rules = neutron_utils.get_rules_by_security_group(
231             self.neutron, self.sec_grp_creator.get_security_group())
232         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
233         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
234                                             rules)
235
236     def test_create_group_new_user_to_admin_project(self):
237         """
238         Tests the creation of an OpenStack Security Group without custom rules.
239         """
240         # Create Image
241         sec_grp_settings = SecurityGroupSettings(
242             name=self.sec_grp_name, description='hello group',
243             project_name=self.os_creds.project_name)
244         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
245             self.admin_os_creds, sec_grp_settings)
246         self.sec_grp_creator.create()
247
248         sec_grp = neutron_utils.get_security_group(self.neutron,
249                                                    self.sec_grp_name)
250         self.assertIsNotNone(sec_grp)
251
252         validation_utils.objects_equivalent(
253             self.sec_grp_creator.get_security_group(), sec_grp)
254         rules = neutron_utils.get_rules_by_security_group(
255             self.neutron, self.sec_grp_creator.get_security_group())
256         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
257         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
258                                             rules)
259
260     def test_create_delete_group(self):
261         """
262         Tests the creation of an OpenStack Security Group without custom rules.
263         """
264         # Create Image
265         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
266                                                  description='hello group')
267         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
268             self.os_creds, sec_grp_settings)
269         created_sec_grp = self.sec_grp_creator.create()
270         self.assertIsNotNone(created_sec_grp)
271
272         neutron_utils.delete_security_group(self.neutron, created_sec_grp)
273         self.assertIsNone(neutron_utils.get_security_group(
274             self.neutron, self.sec_grp_creator.sec_grp_settings.name))
275
276         self.sec_grp_creator.clean()
277
278     def test_create_group_with_one_simple_rule(self):
279         """
280         Tests the creation of an OpenStack Security Group with one simple
281         custom rule.
282         """
283         # Create Image
284         sec_grp_rule_settings = list()
285         sec_grp_rule_settings.append(
286             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
287                                       direction=Direction.ingress))
288         sec_grp_settings = SecurityGroupSettings(
289             name=self.sec_grp_name, description='hello group',
290             rule_settings=sec_grp_rule_settings)
291         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
292             self.os_creds, sec_grp_settings)
293         self.sec_grp_creator.create()
294
295         sec_grp = neutron_utils.get_security_group(self.neutron,
296                                                    self.sec_grp_name)
297         validation_utils.objects_equivalent(
298             self.sec_grp_creator.get_security_group(), sec_grp)
299         rules = neutron_utils.get_rules_by_security_group(
300             self.neutron, self.sec_grp_creator.get_security_group())
301         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
302         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
303                                             rules)
304
305     def test_create_group_with_several_rules(self):
306         """
307         Tests the creation of an OpenStack Security Group with one simple
308         custom rule.
309         """
310         # Create Image
311         sec_grp_rule_settings = list()
312         sec_grp_rule_settings.append(
313             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
314                                       direction=Direction.ingress))
315         sec_grp_rule_settings.append(
316             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
317                                       direction=Direction.egress,
318                                       protocol=Protocol.udp,
319                                       ethertype=Ethertype.IPv6))
320         sec_grp_rule_settings.append(
321             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
322                                       direction=Direction.egress,
323                                       protocol=Protocol.udp,
324                                       ethertype=Ethertype.IPv4,
325                                       port_range_min=10,
326                                       port_range_max=20))
327         sec_grp_settings = SecurityGroupSettings(
328             name=self.sec_grp_name, description='hello group',
329             rule_settings=sec_grp_rule_settings)
330         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
331             self.os_creds, sec_grp_settings)
332         self.sec_grp_creator.create()
333
334         sec_grp = neutron_utils.get_security_group(self.neutron,
335                                                    self.sec_grp_name)
336         validation_utils.objects_equivalent(
337             self.sec_grp_creator.get_security_group(), sec_grp)
338         rules = neutron_utils.get_rules_by_security_group(
339             self.neutron, self.sec_grp_creator.get_security_group())
340         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
341         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
342                                             rules)
343
344     def test_add_rule(self):
345         """
346         Tests the creation of an OpenStack Security Group with one simple
347         custom rule then adds one after creation.
348         """
349         # Create Image
350         sec_grp_rule_settings = list()
351         sec_grp_rule_settings.append(
352             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
353                                       direction=Direction.ingress))
354         sec_grp_settings = SecurityGroupSettings(
355             name=self.sec_grp_name, description='hello group',
356             rule_settings=sec_grp_rule_settings)
357         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
358             self.os_creds, sec_grp_settings)
359         self.sec_grp_creator.create()
360
361         sec_grp = neutron_utils.get_security_group(self.neutron,
362                                                    self.sec_grp_name)
363         validation_utils.objects_equivalent(
364             self.sec_grp_creator.get_security_group(), sec_grp)
365         rules = neutron_utils.get_rules_by_security_group(
366             self.neutron, self.sec_grp_creator.get_security_group())
367         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
368         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
369                                             rules)
370
371         self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
372             sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
373             direction=Direction.egress, protocol=Protocol.icmp))
374         rules2 = neutron_utils.get_rules_by_security_group(
375             self.neutron, self.sec_grp_creator.get_security_group())
376         self.assertEqual(len(rules) + 1, len(rules2))
377
378     def test_remove_rule_by_id(self):
379         """
380         Tests the creation of an OpenStack Security Group with two simple
381         custom rules then removes one by the rule ID.
382         """
383         # Create Image
384         sec_grp_rule_settings = list()
385         sec_grp_rule_settings.append(
386             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
387                                       direction=Direction.ingress))
388         sec_grp_rule_settings.append(
389             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
390                                       direction=Direction.egress,
391                                       protocol=Protocol.udp,
392                                       ethertype=Ethertype.IPv6))
393         sec_grp_rule_settings.append(
394             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
395                                       direction=Direction.egress,
396                                       protocol=Protocol.udp,
397                                       ethertype=Ethertype.IPv4,
398                                       port_range_min=10,
399                                       port_range_max=20))
400         sec_grp_settings = SecurityGroupSettings(
401             name=self.sec_grp_name, description='hello group',
402             rule_settings=sec_grp_rule_settings)
403         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
404             self.os_creds, sec_grp_settings)
405         self.sec_grp_creator.create()
406
407         sec_grp = neutron_utils.get_security_group(self.neutron,
408                                                    self.sec_grp_name)
409         validation_utils.objects_equivalent(
410             self.sec_grp_creator.get_security_group(), sec_grp)
411         rules = neutron_utils.get_rules_by_security_group(
412             self.neutron, self.sec_grp_creator.get_security_group())
413         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
414         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
415                                             rules)
416
417         self.sec_grp_creator.remove_rule(
418             rule_id=rules[0].id)
419         rules_after_del = neutron_utils.get_rules_by_security_group(
420             self.neutron,
421             self.sec_grp_creator.get_security_group())
422         self.assertEqual(len(rules) - 1, len(rules_after_del))
423
424     def test_remove_rule_by_setting(self):
425         """
426         Tests the creation of an OpenStack Security Group with two simple
427         custom rules then removes one by the rule setting object
428         """
429         # Create Image
430         sec_grp_rule_settings = list()
431         sec_grp_rule_settings.append(
432             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
433                                       direction=Direction.ingress))
434         sec_grp_rule_settings.append(
435             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
436                                       direction=Direction.egress,
437                                       protocol=Protocol.udp,
438                                       ethertype=Ethertype.IPv6))
439         sec_grp_rule_settings.append(
440             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
441                                       direction=Direction.egress,
442                                       protocol=Protocol.udp,
443                                       ethertype=Ethertype.IPv4,
444                                       port_range_min=10,
445                                       port_range_max=20))
446         sec_grp_settings = SecurityGroupSettings(
447             name=self.sec_grp_name, description='hello group',
448             rule_settings=sec_grp_rule_settings)
449         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
450             self.os_creds, sec_grp_settings)
451         self.sec_grp_creator.create()
452
453         sec_grp = neutron_utils.get_security_group(self.neutron,
454                                                    self.sec_grp_name)
455         validation_utils.objects_equivalent(
456             self.sec_grp_creator.get_security_group(), sec_grp)
457         rules = neutron_utils.get_rules_by_security_group(
458             self.neutron, self.sec_grp_creator.get_security_group())
459         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
460         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
461                                             rules)
462
463         self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
464         rules_after_del = neutron_utils.get_rules_by_security_group(
465             self.neutron,
466             self.sec_grp_creator.get_security_group())
467         self.assertEqual(len(rules) - 1, len(rules_after_del))
468
469 # TODO - Add more tests with different rules. Rule creation parameters can be
470 # somewhat complex