dd28d7dc752c6d111fc9281e8ee64dd20282bc81
[snaps.git] / snaps / openstack / tests / create_security_group_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import unittest
16 import uuid
17
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import (SecurityGroupSettings,
20                                                    SecurityGroupRuleSettings,
21                                                    Direction, Ethertype,
22                                                    Protocol,
23                                                    SecurityGroupRuleSettingsError,
24                                                    SecurityGroupSettingsError)
25 from snaps.openstack.tests import validation_utils
26 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
27 from snaps.openstack.utils import neutron_utils
28
29 __author__ = 'spisarski'
30
31
32 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
33     """
34     Tests the construction of the SecurityGroupRuleSettings class
35     """
36
37     def test_no_params(self):
38         with self.assertRaises(SecurityGroupRuleSettingsError):
39             SecurityGroupRuleSettings()
40
41     def test_empty_config(self):
42         with self.assertRaises(SecurityGroupRuleSettingsError):
43             SecurityGroupRuleSettings(**dict())
44
45     def test_name_only(self):
46         with self.assertRaises(SecurityGroupRuleSettingsError):
47             SecurityGroupRuleSettings(sec_grp_name='foo')
48
49     def test_config_with_name_only(self):
50         with self.assertRaises(SecurityGroupRuleSettingsError):
51             SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
52
53     def test_name_and_direction(self):
54         settings = SecurityGroupRuleSettings(sec_grp_name='foo',
55                                              direction=Direction.ingress)
56         self.assertEqual('foo', settings.sec_grp_name)
57         self.assertEqual(Direction.ingress, settings.direction)
58
59     def test_config_name_and_direction(self):
60         settings = SecurityGroupRuleSettings(
61             **{'sec_grp_name': 'foo', 'direction': 'ingress'})
62         self.assertEqual('foo', settings.sec_grp_name)
63         self.assertEqual(Direction.ingress, settings.direction)
64
65     def test_all(self):
66         settings = SecurityGroupRuleSettings(
67             sec_grp_name='foo', description='fubar',
68             direction=Direction.egress, remote_group_id='rgi',
69             protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
70             port_range_max=2,
71             remote_ip_prefix='prfx')
72         self.assertEqual('foo', settings.sec_grp_name)
73         self.assertEqual('fubar', settings.description)
74         self.assertEqual(Direction.egress, settings.direction)
75         self.assertEqual('rgi', settings.remote_group_id)
76         self.assertEqual(Protocol.icmp, settings.protocol)
77         self.assertEqual(Ethertype.IPv6, settings.ethertype)
78         self.assertEqual(1, settings.port_range_min)
79         self.assertEqual(2, settings.port_range_max)
80         self.assertEqual('prfx', settings.remote_ip_prefix)
81
82     def test_config_all(self):
83         settings = SecurityGroupRuleSettings(
84             **{'sec_grp_name': 'foo',
85                'description': 'fubar',
86                'direction': 'egress',
87                'remote_group_id': 'rgi',
88                'protocol': 'tcp',
89                'ethertype': 'IPv6',
90                'port_range_min': 1,
91                'port_range_max': 2,
92                'remote_ip_prefix': 'prfx'})
93         self.assertEqual('foo', settings.sec_grp_name)
94         self.assertEqual('fubar', settings.description)
95         self.assertEqual(Direction.egress, settings.direction)
96         self.assertEqual('rgi', settings.remote_group_id)
97         self.assertEqual(Protocol.tcp, settings.protocol)
98         self.assertEqual(Ethertype.IPv6, settings.ethertype)
99         self.assertEqual(1, settings.port_range_min)
100         self.assertEqual(2, settings.port_range_max)
101         self.assertEqual('prfx', settings.remote_ip_prefix)
102
103
104 class SecurityGroupSettingsUnitTests(unittest.TestCase):
105     """
106     Tests the construction of the SecurityGroupSettings class
107     """
108
109     def test_no_params(self):
110         with self.assertRaises(SecurityGroupSettingsError):
111             SecurityGroupSettings()
112
113     def test_empty_config(self):
114         with self.assertRaises(SecurityGroupSettingsError):
115             SecurityGroupSettings(**dict())
116
117     def test_name_only(self):
118         settings = SecurityGroupSettings(name='foo')
119         self.assertEqual('foo', settings.name)
120
121     def test_config_with_name_only(self):
122         settings = SecurityGroupSettings(**{'name': 'foo'})
123         self.assertEqual('foo', settings.name)
124
125     def test_invalid_rule(self):
126         rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
127                                                  direction=Direction.ingress)
128         with self.assertRaises(SecurityGroupSettingsError):
129             SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
130
131     def test_all(self):
132         rule_settings = list()
133         rule_settings.append(SecurityGroupRuleSettings(
134             sec_grp_name='bar', direction=Direction.egress))
135         rule_settings.append(SecurityGroupRuleSettings(
136             sec_grp_name='bar', direction=Direction.ingress))
137         settings = SecurityGroupSettings(
138             name='bar', description='fubar', project_name='foo',
139             rule_settings=rule_settings)
140
141         self.assertEqual('bar', settings.name)
142         self.assertEqual('fubar', settings.description)
143         self.assertEqual('foo', settings.project_name)
144         self.assertEqual(rule_settings[0], settings.rule_settings[0])
145         self.assertEqual(rule_settings[1], settings.rule_settings[1])
146
147     def test_config_all(self):
148         settings = SecurityGroupSettings(
149             **{'name': 'bar',
150                'description': 'fubar',
151                'project_name': 'foo',
152                'rules': [
153                    {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
154
155         self.assertEqual('bar', settings.name)
156         self.assertEqual('fubar', settings.description)
157         self.assertEqual('foo', settings.project_name)
158         self.assertEqual(1, len(settings.rule_settings))
159         self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
160         self.assertEqual(Direction.ingress,
161                          settings.rule_settings[0].direction)
162
163
164 class CreateSecurityGroupTests(OSIntegrationTestCase):
165     """
166     Test for the CreateSecurityGroup class defined in create_security_group.py
167     """
168
169     def setUp(self):
170         """
171         Instantiates the CreateSecurityGroup object that is responsible for
172         downloading and creating an OS image file within OpenStack
173         """
174         super(self.__class__, self).__start__()
175
176         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
177         self.sec_grp_name = guid + 'name'
178         self.neutron = neutron_utils.neutron_client(self.os_creds)
179
180         # Initialize for cleanup
181         self.sec_grp_creator = None
182
183     def tearDown(self):
184         """
185         Cleans the image and downloaded image file
186         """
187         if self.sec_grp_creator:
188             self.sec_grp_creator.clean()
189
190         super(self.__class__, self).__clean__()
191
192     def test_create_group_without_rules(self):
193         """
194         Tests the creation of an OpenStack Security Group without custom rules.
195         """
196         # Create Image
197         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
198                                                  description='hello group')
199         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
200             self.os_creds, sec_grp_settings)
201         self.sec_grp_creator.create()
202
203         sec_grp = neutron_utils.get_security_group(self.neutron,
204                                                    self.sec_grp_name)
205         self.assertIsNotNone(sec_grp)
206
207         validation_utils.objects_equivalent(
208             self.sec_grp_creator.get_security_group(), sec_grp)
209         rules = neutron_utils.get_rules_by_security_group(
210             self.neutron, self.sec_grp_creator.get_security_group())
211         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
212         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
213                                             rules)
214
215     def test_create_delete_group(self):
216         """
217         Tests the creation of an OpenStack Security Group without custom rules.
218         """
219         # Create Image
220         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
221                                                  description='hello group')
222         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
223             self.os_creds, sec_grp_settings)
224         created_sec_grp = self.sec_grp_creator.create()
225         self.assertIsNotNone(created_sec_grp)
226
227         neutron_utils.delete_security_group(self.neutron, created_sec_grp)
228         self.assertIsNone(neutron_utils.get_security_group(
229             self.neutron, self.sec_grp_creator.sec_grp_settings.name))
230
231         self.sec_grp_creator.clean()
232
233     def test_create_group_with_one_simple_rule(self):
234         """
235         Tests the creation of an OpenStack Security Group with one simple
236         custom rule.
237         """
238         # Create Image
239         sec_grp_rule_settings = list()
240         sec_grp_rule_settings.append(
241             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
242                                       direction=Direction.ingress))
243         sec_grp_settings = SecurityGroupSettings(
244             name=self.sec_grp_name, description='hello group',
245             rule_settings=sec_grp_rule_settings)
246         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
247             self.os_creds, sec_grp_settings)
248         self.sec_grp_creator.create()
249
250         sec_grp = neutron_utils.get_security_group(self.neutron,
251                                                    self.sec_grp_name)
252         validation_utils.objects_equivalent(
253             self.sec_grp_creator.get_security_group(), sec_grp)
254         rules = neutron_utils.get_rules_by_security_group(
255             self.neutron, self.sec_grp_creator.get_security_group())
256         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
257         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
258                                             rules)
259
260     def test_create_group_with_several_rules(self):
261         """
262         Tests the creation of an OpenStack Security Group with one simple
263         custom rule.
264         """
265         # Create Image
266         sec_grp_rule_settings = list()
267         sec_grp_rule_settings.append(
268             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
269                                       direction=Direction.ingress))
270         sec_grp_rule_settings.append(
271             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
272                                       direction=Direction.egress,
273                                       protocol=Protocol.udp,
274                                       ethertype=Ethertype.IPv6))
275         sec_grp_rule_settings.append(
276             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
277                                       direction=Direction.egress,
278                                       protocol=Protocol.udp,
279                                       ethertype=Ethertype.IPv4,
280                                       port_range_min=10,
281                                       port_range_max=20))
282         sec_grp_settings = SecurityGroupSettings(
283             name=self.sec_grp_name, description='hello group',
284             rule_settings=sec_grp_rule_settings)
285         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
286             self.os_creds, sec_grp_settings)
287         self.sec_grp_creator.create()
288
289         sec_grp = neutron_utils.get_security_group(self.neutron,
290                                                    self.sec_grp_name)
291         validation_utils.objects_equivalent(
292             self.sec_grp_creator.get_security_group(), sec_grp)
293         rules = neutron_utils.get_rules_by_security_group(
294             self.neutron, self.sec_grp_creator.get_security_group())
295         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
296         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
297                                             rules)
298
299     def test_add_rule(self):
300         """
301         Tests the creation of an OpenStack Security Group with one simple
302         custom rule then adds one after creation.
303         """
304         # Create Image
305         sec_grp_rule_settings = list()
306         sec_grp_rule_settings.append(
307             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
308                                       direction=Direction.ingress))
309         sec_grp_settings = SecurityGroupSettings(
310             name=self.sec_grp_name, description='hello group',
311             rule_settings=sec_grp_rule_settings)
312         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
313             self.os_creds, sec_grp_settings)
314         self.sec_grp_creator.create()
315
316         sec_grp = neutron_utils.get_security_group(self.neutron,
317                                                    self.sec_grp_name)
318         validation_utils.objects_equivalent(
319             self.sec_grp_creator.get_security_group(), sec_grp)
320         rules = neutron_utils.get_rules_by_security_group(
321             self.neutron, self.sec_grp_creator.get_security_group())
322         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
323         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
324                                             rules)
325
326         self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
327             sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
328             direction=Direction.egress, protocol=Protocol.icmp))
329         rules2 = neutron_utils.get_rules_by_security_group(
330             self.neutron, self.sec_grp_creator.get_security_group())
331         self.assertEqual(len(rules) + 1, len(rules2))
332
333     def test_remove_rule_by_id(self):
334         """
335         Tests the creation of an OpenStack Security Group with two simple
336         custom rules then removes one by the rule ID.
337         """
338         # Create Image
339         sec_grp_rule_settings = list()
340         sec_grp_rule_settings.append(
341             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
342                                       direction=Direction.ingress))
343         sec_grp_rule_settings.append(
344             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
345                                       direction=Direction.egress,
346                                       protocol=Protocol.udp,
347                                       ethertype=Ethertype.IPv6))
348         sec_grp_rule_settings.append(
349             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
350                                       direction=Direction.egress,
351                                       protocol=Protocol.udp,
352                                       ethertype=Ethertype.IPv4,
353                                       port_range_min=10,
354                                       port_range_max=20))
355         sec_grp_settings = SecurityGroupSettings(
356             name=self.sec_grp_name, description='hello group',
357             rule_settings=sec_grp_rule_settings)
358         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
359             self.os_creds, sec_grp_settings)
360         self.sec_grp_creator.create()
361
362         sec_grp = neutron_utils.get_security_group(self.neutron,
363                                                    self.sec_grp_name)
364         validation_utils.objects_equivalent(
365             self.sec_grp_creator.get_security_group(), sec_grp)
366         rules = neutron_utils.get_rules_by_security_group(
367             self.neutron, self.sec_grp_creator.get_security_group())
368         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
369         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
370                                             rules)
371
372         self.sec_grp_creator.remove_rule(
373             rule_id=rules[0].id)
374         rules_after_del = neutron_utils.get_rules_by_security_group(
375             self.neutron,
376             self.sec_grp_creator.get_security_group())
377         self.assertEqual(len(rules) - 1, len(rules_after_del))
378
379     def test_remove_rule_by_setting(self):
380         """
381         Tests the creation of an OpenStack Security Group with two simple
382         custom rules then removes one by the rule setting object
383         """
384         # Create Image
385         sec_grp_rule_settings = list()
386         sec_grp_rule_settings.append(
387             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
388                                       direction=Direction.ingress))
389         sec_grp_rule_settings.append(
390             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
391                                       direction=Direction.egress,
392                                       protocol=Protocol.udp,
393                                       ethertype=Ethertype.IPv6))
394         sec_grp_rule_settings.append(
395             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
396                                       direction=Direction.egress,
397                                       protocol=Protocol.udp,
398                                       ethertype=Ethertype.IPv4,
399                                       port_range_min=10,
400                                       port_range_max=20))
401         sec_grp_settings = SecurityGroupSettings(
402             name=self.sec_grp_name, description='hello group',
403             rule_settings=sec_grp_rule_settings)
404         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
405             self.os_creds, sec_grp_settings)
406         self.sec_grp_creator.create()
407
408         sec_grp = neutron_utils.get_security_group(self.neutron,
409                                                    self.sec_grp_name)
410         validation_utils.objects_equivalent(
411             self.sec_grp_creator.get_security_group(), sec_grp)
412         rules = neutron_utils.get_rules_by_security_group(
413             self.neutron, self.sec_grp_creator.get_security_group())
414         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
415         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
416                                             rules)
417
418         self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
419         rules_after_del = neutron_utils.get_rules_by_security_group(
420             self.neutron,
421             self.sec_grp_creator.get_security_group())
422         self.assertEqual(len(rules) - 1, len(rules_after_del))
423
424 # TODO - Add more tests with different rules. Rule creation parameters can be
425 # somewhat complex