Default OSCreds cacert attribute to False.
[snaps.git] / snaps / openstack / tests / create_security_group_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import unittest
16 import uuid
17
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import (SecurityGroupSettings,
20                                                    SecurityGroupRuleSettings,
21                                                    Direction, Ethertype,
22                                                    Protocol)
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
25 from snaps.openstack.utils import neutron_utils
26
27 __author__ = 'spisarski'
28
29
30 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
31     """
32     Tests the construction of the SecurityGroupRuleSettings class
33     """
34
35     def test_no_params(self):
36         with self.assertRaises(Exception):
37             SecurityGroupRuleSettings()
38
39     def test_empty_config(self):
40         with self.assertRaises(Exception):
41             SecurityGroupRuleSettings(**dict())
42
43     def test_name_only(self):
44         with self.assertRaises(Exception):
45             SecurityGroupRuleSettings(sec_grp_name='foo')
46
47     def test_config_with_name_only(self):
48         with self.assertRaises(Exception):
49             SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
50
51     def test_name_and_direction(self):
52         settings = SecurityGroupRuleSettings(sec_grp_name='foo',
53                                              direction=Direction.ingress)
54         self.assertEqual('foo', settings.sec_grp_name)
55         self.assertEqual(Direction.ingress, settings.direction)
56
57     def test_config_name_and_direction(self):
58         settings = SecurityGroupRuleSettings(
59             **{'sec_grp_name': 'foo', 'direction': 'ingress'})
60         self.assertEqual('foo', settings.sec_grp_name)
61         self.assertEqual(Direction.ingress, settings.direction)
62
63     def test_all(self):
64         settings = SecurityGroupRuleSettings(
65             sec_grp_name='foo', description='fubar',
66             direction=Direction.egress, remote_group_id='rgi',
67             protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
68             port_range_max=2,
69             remote_ip_prefix='prfx')
70         self.assertEqual('foo', settings.sec_grp_name)
71         self.assertEqual('fubar', settings.description)
72         self.assertEqual(Direction.egress, settings.direction)
73         self.assertEqual('rgi', settings.remote_group_id)
74         self.assertEqual(Protocol.icmp, settings.protocol)
75         self.assertEqual(Ethertype.IPv6, settings.ethertype)
76         self.assertEqual(1, settings.port_range_min)
77         self.assertEqual(2, settings.port_range_max)
78         self.assertEqual('prfx', settings.remote_ip_prefix)
79
80     def test_config_all(self):
81         settings = SecurityGroupRuleSettings(
82             **{'sec_grp_name': 'foo',
83                'description': 'fubar',
84                'direction': 'egress',
85                'remote_group_id': 'rgi',
86                'protocol': 'tcp',
87                'ethertype': 'IPv6',
88                'port_range_min': 1,
89                'port_range_max': 2,
90                'remote_ip_prefix': 'prfx'})
91         self.assertEqual('foo', settings.sec_grp_name)
92         self.assertEqual('fubar', settings.description)
93         self.assertEqual(Direction.egress, settings.direction)
94         self.assertEqual('rgi', settings.remote_group_id)
95         self.assertEqual(Protocol.tcp, settings.protocol)
96         self.assertEqual(Ethertype.IPv6, settings.ethertype)
97         self.assertEqual(1, settings.port_range_min)
98         self.assertEqual(2, settings.port_range_max)
99         self.assertEqual('prfx', settings.remote_ip_prefix)
100
101
102 class SecurityGroupSettingsUnitTests(unittest.TestCase):
103     """
104     Tests the construction of the SecurityGroupSettings class
105     """
106
107     def test_no_params(self):
108         with self.assertRaises(Exception):
109             SecurityGroupSettings()
110
111     def test_empty_config(self):
112         with self.assertRaises(Exception):
113             SecurityGroupSettings(**dict())
114
115     def test_name_only(self):
116         settings = SecurityGroupSettings(name='foo')
117         self.assertEqual('foo', settings.name)
118
119     def test_config_with_name_only(self):
120         settings = SecurityGroupSettings(**{'name': 'foo'})
121         self.assertEqual('foo', settings.name)
122
123     def test_invalid_rule(self):
124         rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
125                                                  direction=Direction.ingress)
126         with self.assertRaises(Exception):
127             SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
128
129     def test_all(self):
130         rule_settings = list()
131         rule_settings.append(SecurityGroupRuleSettings(
132             sec_grp_name='bar', direction=Direction.egress))
133         rule_settings.append(SecurityGroupRuleSettings(
134             sec_grp_name='bar', direction=Direction.ingress))
135         settings = SecurityGroupSettings(
136             name='bar', description='fubar', project_name='foo',
137             rule_settings=rule_settings)
138
139         self.assertEqual('bar', settings.name)
140         self.assertEqual('fubar', settings.description)
141         self.assertEqual('foo', settings.project_name)
142         self.assertEqual(rule_settings[0], settings.rule_settings[0])
143         self.assertEqual(rule_settings[1], settings.rule_settings[1])
144
145     def test_config_all(self):
146         settings = SecurityGroupSettings(
147             **{'name': 'bar',
148                'description': 'fubar',
149                'project_name': 'foo',
150                'rules': [
151                    {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
152
153         self.assertEqual('bar', settings.name)
154         self.assertEqual('fubar', settings.description)
155         self.assertEqual('foo', settings.project_name)
156         self.assertEqual(1, len(settings.rule_settings))
157         self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
158         self.assertEqual(Direction.ingress,
159                          settings.rule_settings[0].direction)
160
161
162 class CreateSecurityGroupTests(OSIntegrationTestCase):
163     """
164     Test for the CreateSecurityGroup class defined in create_security_group.py
165     """
166
167     def setUp(self):
168         """
169         Instantiates the CreateSecurityGroup object that is responsible for
170         downloading and creating an OS image file within OpenStack
171         """
172         super(self.__class__, self).__start__()
173
174         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
175         self.sec_grp_name = guid + 'name'
176         self.neutron = neutron_utils.neutron_client(self.os_creds)
177
178         # Initialize for cleanup
179         self.sec_grp_creator = None
180
181     def tearDown(self):
182         """
183         Cleans the image and downloaded image file
184         """
185         if self.sec_grp_creator:
186             self.sec_grp_creator.clean()
187
188         super(self.__class__, self).__clean__()
189
190     def test_create_group_without_rules(self):
191         """
192         Tests the creation of an OpenStack Security Group without custom rules.
193         """
194         # Create Image
195         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
196                                                  description='hello group')
197         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
198             self.os_creds, sec_grp_settings)
199         self.sec_grp_creator.create()
200
201         sec_grp = neutron_utils.get_security_group(self.neutron,
202                                                    self.sec_grp_name)
203         self.assertIsNotNone(sec_grp)
204
205         validation_utils.objects_equivalent(
206             self.sec_grp_creator.get_security_group(), sec_grp)
207         rules = neutron_utils.get_rules_by_security_group(
208             self.neutron, self.sec_grp_creator.get_security_group())
209         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
210         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
211                                             rules)
212
213     def test_create_delete_group(self):
214         """
215         Tests the creation of an OpenStack Security Group without custom rules.
216         """
217         # Create Image
218         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
219                                                  description='hello group')
220         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
221             self.os_creds, sec_grp_settings)
222         created_sec_grp = self.sec_grp_creator.create()
223         self.assertIsNotNone(created_sec_grp)
224
225         neutron_utils.delete_security_group(self.neutron, created_sec_grp)
226         self.assertIsNone(neutron_utils.get_security_group(
227             self.neutron, self.sec_grp_creator.sec_grp_settings.name))
228
229         self.sec_grp_creator.clean()
230
231     def test_create_group_with_one_simple_rule(self):
232         """
233         Tests the creation of an OpenStack Security Group with one simple
234         custom rule.
235         """
236         # Create Image
237         sec_grp_rule_settings = list()
238         sec_grp_rule_settings.append(
239             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
240                                       direction=Direction.ingress))
241         sec_grp_settings = SecurityGroupSettings(
242             name=self.sec_grp_name, description='hello group',
243             rule_settings=sec_grp_rule_settings)
244         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
245             self.os_creds, sec_grp_settings)
246         self.sec_grp_creator.create()
247
248         sec_grp = neutron_utils.get_security_group(self.neutron,
249                                                    self.sec_grp_name)
250         validation_utils.objects_equivalent(
251             self.sec_grp_creator.get_security_group(), sec_grp)
252         rules = neutron_utils.get_rules_by_security_group(
253             self.neutron, self.sec_grp_creator.get_security_group())
254         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
255         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
256                                             rules)
257
258     def test_create_group_with_several_rules(self):
259         """
260         Tests the creation of an OpenStack Security Group with one simple
261         custom rule.
262         """
263         # Create Image
264         sec_grp_rule_settings = list()
265         sec_grp_rule_settings.append(
266             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
267                                       direction=Direction.ingress))
268         sec_grp_rule_settings.append(
269             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
270                                       direction=Direction.egress,
271                                       protocol=Protocol.udp,
272                                       ethertype=Ethertype.IPv6))
273         sec_grp_rule_settings.append(
274             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
275                                       direction=Direction.egress,
276                                       protocol=Protocol.udp,
277                                       ethertype=Ethertype.IPv4,
278                                       port_range_min=10,
279                                       port_range_max=20))
280         sec_grp_settings = SecurityGroupSettings(
281             name=self.sec_grp_name, description='hello group',
282             rule_settings=sec_grp_rule_settings)
283         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
284             self.os_creds, sec_grp_settings)
285         self.sec_grp_creator.create()
286
287         sec_grp = neutron_utils.get_security_group(self.neutron,
288                                                    self.sec_grp_name)
289         validation_utils.objects_equivalent(
290             self.sec_grp_creator.get_security_group(), sec_grp)
291         rules = neutron_utils.get_rules_by_security_group(
292             self.neutron, self.sec_grp_creator.get_security_group())
293         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
294         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
295                                             rules)
296
297     def test_add_rule(self):
298         """
299         Tests the creation of an OpenStack Security Group with one simple
300         custom rule then adds one after creation.
301         """
302         # Create Image
303         sec_grp_rule_settings = list()
304         sec_grp_rule_settings.append(
305             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
306                                       direction=Direction.ingress))
307         sec_grp_settings = SecurityGroupSettings(
308             name=self.sec_grp_name, description='hello group',
309             rule_settings=sec_grp_rule_settings)
310         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
311             self.os_creds, sec_grp_settings)
312         self.sec_grp_creator.create()
313
314         sec_grp = neutron_utils.get_security_group(self.neutron,
315                                                    self.sec_grp_name)
316         validation_utils.objects_equivalent(
317             self.sec_grp_creator.get_security_group(), sec_grp)
318         rules = neutron_utils.get_rules_by_security_group(
319             self.neutron, self.sec_grp_creator.get_security_group())
320         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
321         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
322                                             rules)
323
324         self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
325             sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
326             direction=Direction.egress, protocol=Protocol.icmp))
327         rules2 = neutron_utils.get_rules_by_security_group(
328             self.neutron, self.sec_grp_creator.get_security_group())
329         self.assertEqual(len(rules) + 1, len(rules2))
330
331     def test_remove_rule_by_id(self):
332         """
333         Tests the creation of an OpenStack Security Group with two simple
334         custom rules then removes one by the rule ID.
335         """
336         # Create Image
337         sec_grp_rule_settings = list()
338         sec_grp_rule_settings.append(
339             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
340                                       direction=Direction.ingress))
341         sec_grp_rule_settings.append(
342             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
343                                       direction=Direction.egress,
344                                       protocol=Protocol.udp,
345                                       ethertype=Ethertype.IPv6))
346         sec_grp_rule_settings.append(
347             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
348                                       direction=Direction.egress,
349                                       protocol=Protocol.udp,
350                                       ethertype=Ethertype.IPv4,
351                                       port_range_min=10,
352                                       port_range_max=20))
353         sec_grp_settings = SecurityGroupSettings(
354             name=self.sec_grp_name, description='hello group',
355             rule_settings=sec_grp_rule_settings)
356         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
357             self.os_creds, sec_grp_settings)
358         self.sec_grp_creator.create()
359
360         sec_grp = neutron_utils.get_security_group(self.neutron,
361                                                    self.sec_grp_name)
362         validation_utils.objects_equivalent(
363             self.sec_grp_creator.get_security_group(), sec_grp)
364         rules = neutron_utils.get_rules_by_security_group(
365             self.neutron, self.sec_grp_creator.get_security_group())
366         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
367         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
368                                             rules)
369
370         self.sec_grp_creator.remove_rule(
371             rule_id=rules[0].id)
372         rules_after_del = neutron_utils.get_rules_by_security_group(
373             self.neutron,
374             self.sec_grp_creator.get_security_group())
375         self.assertEqual(len(rules) - 1, len(rules_after_del))
376
377     def test_remove_rule_by_setting(self):
378         """
379         Tests the creation of an OpenStack Security Group with two simple
380         custom rules then removes one by the rule setting object
381         """
382         # Create Image
383         sec_grp_rule_settings = list()
384         sec_grp_rule_settings.append(
385             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
386                                       direction=Direction.ingress))
387         sec_grp_rule_settings.append(
388             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
389                                       direction=Direction.egress,
390                                       protocol=Protocol.udp,
391                                       ethertype=Ethertype.IPv6))
392         sec_grp_rule_settings.append(
393             SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
394                                       direction=Direction.egress,
395                                       protocol=Protocol.udp,
396                                       ethertype=Ethertype.IPv4,
397                                       port_range_min=10,
398                                       port_range_max=20))
399         sec_grp_settings = SecurityGroupSettings(
400             name=self.sec_grp_name, description='hello group',
401             rule_settings=sec_grp_rule_settings)
402         self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
403             self.os_creds, sec_grp_settings)
404         self.sec_grp_creator.create()
405
406         sec_grp = neutron_utils.get_security_group(self.neutron,
407                                                    self.sec_grp_name)
408         validation_utils.objects_equivalent(
409             self.sec_grp_creator.get_security_group(), sec_grp)
410         rules = neutron_utils.get_rules_by_security_group(
411             self.neutron, self.sec_grp_creator.get_security_group())
412         self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
413         validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
414                                             rules)
415
416         self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
417         rules_after_del = neutron_utils.get_rules_by_security_group(
418             self.neutron,
419             self.sec_grp_creator.get_security_group())
420         self.assertEqual(len(rules) - 1, len(rules_after_del))
421
422 # TODO - Add more tests with different rules. Rule creation parameters can be
423 # somewhat complex