1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import (SecurityGroupSettings,
20 SecurityGroupRuleSettings,
23 SecurityGroupRuleSettingsError,
24 SecurityGroupSettingsError)
25 from snaps.openstack.tests import validation_utils
26 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
27 from snaps.openstack.utils import neutron_utils
29 __author__ = 'spisarski'
32 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
34 Tests the construction of the SecurityGroupRuleSettings class
37 def test_no_params(self):
38 with self.assertRaises(SecurityGroupRuleSettingsError):
39 SecurityGroupRuleSettings()
41 def test_empty_config(self):
42 with self.assertRaises(SecurityGroupRuleSettingsError):
43 SecurityGroupRuleSettings(**dict())
45 def test_name_only(self):
46 with self.assertRaises(SecurityGroupRuleSettingsError):
47 SecurityGroupRuleSettings(sec_grp_name='foo')
49 def test_config_with_name_only(self):
50 with self.assertRaises(SecurityGroupRuleSettingsError):
51 SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
53 def test_name_and_direction(self):
54 settings = SecurityGroupRuleSettings(sec_grp_name='foo',
55 direction=Direction.ingress)
56 self.assertEqual('foo', settings.sec_grp_name)
57 self.assertEqual(Direction.ingress, settings.direction)
59 def test_config_name_and_direction(self):
60 settings = SecurityGroupRuleSettings(
61 **{'sec_grp_name': 'foo', 'direction': 'ingress'})
62 self.assertEqual('foo', settings.sec_grp_name)
63 self.assertEqual(Direction.ingress, settings.direction)
66 settings = SecurityGroupRuleSettings(
67 sec_grp_name='foo', description='fubar',
68 direction=Direction.egress, remote_group_id='rgi',
69 protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
71 remote_ip_prefix='prfx')
72 self.assertEqual('foo', settings.sec_grp_name)
73 self.assertEqual('fubar', settings.description)
74 self.assertEqual(Direction.egress, settings.direction)
75 self.assertEqual('rgi', settings.remote_group_id)
76 self.assertEqual(Protocol.icmp, settings.protocol)
77 self.assertEqual(Ethertype.IPv6, settings.ethertype)
78 self.assertEqual(1, settings.port_range_min)
79 self.assertEqual(2, settings.port_range_max)
80 self.assertEqual('prfx', settings.remote_ip_prefix)
82 def test_config_all(self):
83 settings = SecurityGroupRuleSettings(
84 **{'sec_grp_name': 'foo',
85 'description': 'fubar',
86 'direction': 'egress',
87 'remote_group_id': 'rgi',
92 'remote_ip_prefix': 'prfx'})
93 self.assertEqual('foo', settings.sec_grp_name)
94 self.assertEqual('fubar', settings.description)
95 self.assertEqual(Direction.egress, settings.direction)
96 self.assertEqual('rgi', settings.remote_group_id)
97 self.assertEqual(Protocol.tcp, settings.protocol)
98 self.assertEqual(Ethertype.IPv6, settings.ethertype)
99 self.assertEqual(1, settings.port_range_min)
100 self.assertEqual(2, settings.port_range_max)
101 self.assertEqual('prfx', settings.remote_ip_prefix)
104 class SecurityGroupSettingsUnitTests(unittest.TestCase):
106 Tests the construction of the SecurityGroupSettings class
109 def test_no_params(self):
110 with self.assertRaises(SecurityGroupSettingsError):
111 SecurityGroupSettings()
113 def test_empty_config(self):
114 with self.assertRaises(SecurityGroupSettingsError):
115 SecurityGroupSettings(**dict())
117 def test_name_only(self):
118 settings = SecurityGroupSettings(name='foo')
119 self.assertEqual('foo', settings.name)
121 def test_config_with_name_only(self):
122 settings = SecurityGroupSettings(**{'name': 'foo'})
123 self.assertEqual('foo', settings.name)
125 def test_invalid_rule(self):
126 rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
127 direction=Direction.ingress)
128 with self.assertRaises(SecurityGroupSettingsError):
129 SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
132 rule_settings = list()
133 rule_settings.append(SecurityGroupRuleSettings(
134 sec_grp_name='bar', direction=Direction.egress))
135 rule_settings.append(SecurityGroupRuleSettings(
136 sec_grp_name='bar', direction=Direction.ingress))
137 settings = SecurityGroupSettings(
138 name='bar', description='fubar', project_name='foo',
139 rule_settings=rule_settings)
141 self.assertEqual('bar', settings.name)
142 self.assertEqual('fubar', settings.description)
143 self.assertEqual('foo', settings.project_name)
144 self.assertEqual(rule_settings[0], settings.rule_settings[0])
145 self.assertEqual(rule_settings[1], settings.rule_settings[1])
147 def test_config_all(self):
148 settings = SecurityGroupSettings(
150 'description': 'fubar',
151 'project_name': 'foo',
153 {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
155 self.assertEqual('bar', settings.name)
156 self.assertEqual('fubar', settings.description)
157 self.assertEqual('foo', settings.project_name)
158 self.assertEqual(1, len(settings.rule_settings))
159 self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
160 self.assertEqual(Direction.ingress,
161 settings.rule_settings[0].direction)
164 class CreateSecurityGroupTests(OSIntegrationTestCase):
166 Test for the CreateSecurityGroup class defined in create_security_group.py
171 Instantiates the CreateSecurityGroup object that is responsible for
172 downloading and creating an OS image file within OpenStack
174 super(self.__class__, self).__start__()
176 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
177 self.sec_grp_name = guid + 'name'
178 self.neutron = neutron_utils.neutron_client(self.os_creds)
180 # Initialize for cleanup
181 self.sec_grp_creator = None
185 Cleans the image and downloaded image file
187 if self.sec_grp_creator:
188 self.sec_grp_creator.clean()
190 super(self.__class__, self).__clean__()
192 def test_create_group_without_rules(self):
194 Tests the creation of an OpenStack Security Group without custom rules.
197 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
198 description='hello group')
199 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
200 self.os_creds, sec_grp_settings)
201 self.sec_grp_creator.create()
203 sec_grp = neutron_utils.get_security_group(self.neutron,
205 self.assertIsNotNone(sec_grp)
207 validation_utils.objects_equivalent(
208 self.sec_grp_creator.get_security_group(), sec_grp)
209 rules = neutron_utils.get_rules_by_security_group(
210 self.neutron, self.sec_grp_creator.get_security_group())
211 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
212 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
215 def test_create_delete_group(self):
217 Tests the creation of an OpenStack Security Group without custom rules.
220 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
221 description='hello group')
222 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
223 self.os_creds, sec_grp_settings)
224 created_sec_grp = self.sec_grp_creator.create()
225 self.assertIsNotNone(created_sec_grp)
227 neutron_utils.delete_security_group(self.neutron, created_sec_grp)
228 self.assertIsNone(neutron_utils.get_security_group(
229 self.neutron, self.sec_grp_creator.sec_grp_settings.name))
231 self.sec_grp_creator.clean()
233 def test_create_group_with_one_simple_rule(self):
235 Tests the creation of an OpenStack Security Group with one simple
239 sec_grp_rule_settings = list()
240 sec_grp_rule_settings.append(
241 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
242 direction=Direction.ingress))
243 sec_grp_settings = SecurityGroupSettings(
244 name=self.sec_grp_name, description='hello group',
245 rule_settings=sec_grp_rule_settings)
246 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
247 self.os_creds, sec_grp_settings)
248 self.sec_grp_creator.create()
250 sec_grp = neutron_utils.get_security_group(self.neutron,
252 validation_utils.objects_equivalent(
253 self.sec_grp_creator.get_security_group(), sec_grp)
254 rules = neutron_utils.get_rules_by_security_group(
255 self.neutron, self.sec_grp_creator.get_security_group())
256 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
257 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
260 def test_create_group_with_several_rules(self):
262 Tests the creation of an OpenStack Security Group with one simple
266 sec_grp_rule_settings = list()
267 sec_grp_rule_settings.append(
268 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
269 direction=Direction.ingress))
270 sec_grp_rule_settings.append(
271 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
272 direction=Direction.egress,
273 protocol=Protocol.udp,
274 ethertype=Ethertype.IPv6))
275 sec_grp_rule_settings.append(
276 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
277 direction=Direction.egress,
278 protocol=Protocol.udp,
279 ethertype=Ethertype.IPv4,
282 sec_grp_settings = SecurityGroupSettings(
283 name=self.sec_grp_name, description='hello group',
284 rule_settings=sec_grp_rule_settings)
285 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
286 self.os_creds, sec_grp_settings)
287 self.sec_grp_creator.create()
289 sec_grp = neutron_utils.get_security_group(self.neutron,
291 validation_utils.objects_equivalent(
292 self.sec_grp_creator.get_security_group(), sec_grp)
293 rules = neutron_utils.get_rules_by_security_group(
294 self.neutron, self.sec_grp_creator.get_security_group())
295 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
296 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
299 def test_add_rule(self):
301 Tests the creation of an OpenStack Security Group with one simple
302 custom rule then adds one after creation.
305 sec_grp_rule_settings = list()
306 sec_grp_rule_settings.append(
307 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
308 direction=Direction.ingress))
309 sec_grp_settings = SecurityGroupSettings(
310 name=self.sec_grp_name, description='hello group',
311 rule_settings=sec_grp_rule_settings)
312 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
313 self.os_creds, sec_grp_settings)
314 self.sec_grp_creator.create()
316 sec_grp = neutron_utils.get_security_group(self.neutron,
318 validation_utils.objects_equivalent(
319 self.sec_grp_creator.get_security_group(), sec_grp)
320 rules = neutron_utils.get_rules_by_security_group(
321 self.neutron, self.sec_grp_creator.get_security_group())
322 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
323 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
326 self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
327 sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
328 direction=Direction.egress, protocol=Protocol.icmp))
329 rules2 = neutron_utils.get_rules_by_security_group(
330 self.neutron, self.sec_grp_creator.get_security_group())
331 self.assertEqual(len(rules) + 1, len(rules2))
333 def test_remove_rule_by_id(self):
335 Tests the creation of an OpenStack Security Group with two simple
336 custom rules then removes one by the rule ID.
339 sec_grp_rule_settings = list()
340 sec_grp_rule_settings.append(
341 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
342 direction=Direction.ingress))
343 sec_grp_rule_settings.append(
344 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
345 direction=Direction.egress,
346 protocol=Protocol.udp,
347 ethertype=Ethertype.IPv6))
348 sec_grp_rule_settings.append(
349 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
350 direction=Direction.egress,
351 protocol=Protocol.udp,
352 ethertype=Ethertype.IPv4,
355 sec_grp_settings = SecurityGroupSettings(
356 name=self.sec_grp_name, description='hello group',
357 rule_settings=sec_grp_rule_settings)
358 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
359 self.os_creds, sec_grp_settings)
360 self.sec_grp_creator.create()
362 sec_grp = neutron_utils.get_security_group(self.neutron,
364 validation_utils.objects_equivalent(
365 self.sec_grp_creator.get_security_group(), sec_grp)
366 rules = neutron_utils.get_rules_by_security_group(
367 self.neutron, self.sec_grp_creator.get_security_group())
368 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
369 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
372 self.sec_grp_creator.remove_rule(
374 rules_after_del = neutron_utils.get_rules_by_security_group(
376 self.sec_grp_creator.get_security_group())
377 self.assertEqual(len(rules) - 1, len(rules_after_del))
379 def test_remove_rule_by_setting(self):
381 Tests the creation of an OpenStack Security Group with two simple
382 custom rules then removes one by the rule setting object
385 sec_grp_rule_settings = list()
386 sec_grp_rule_settings.append(
387 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
388 direction=Direction.ingress))
389 sec_grp_rule_settings.append(
390 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
391 direction=Direction.egress,
392 protocol=Protocol.udp,
393 ethertype=Ethertype.IPv6))
394 sec_grp_rule_settings.append(
395 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
396 direction=Direction.egress,
397 protocol=Protocol.udp,
398 ethertype=Ethertype.IPv4,
401 sec_grp_settings = SecurityGroupSettings(
402 name=self.sec_grp_name, description='hello group',
403 rule_settings=sec_grp_rule_settings)
404 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
405 self.os_creds, sec_grp_settings)
406 self.sec_grp_creator.create()
408 sec_grp = neutron_utils.get_security_group(self.neutron,
410 validation_utils.objects_equivalent(
411 self.sec_grp_creator.get_security_group(), sec_grp)
412 rules = neutron_utils.get_rules_by_security_group(
413 self.neutron, self.sec_grp_creator.get_security_group())
414 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
415 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
418 self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
419 rules_after_del = neutron_utils.get_rules_by_security_group(
421 self.sec_grp_creator.get_security_group())
422 self.assertEqual(len(rules) - 1, len(rules_after_del))
424 # TODO - Add more tests with different rules. Rule creation parameters can be