1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.openstack import create_security_group
19 from snaps.openstack.create_security_group import (SecurityGroupSettings,
20 SecurityGroupRuleSettings,
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
25 from snaps.openstack.utils import neutron_utils
27 __author__ = 'spisarski'
30 class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
32 Tests the construction of the SecurityGroupRuleSettings class
35 def test_no_params(self):
36 with self.assertRaises(Exception):
37 SecurityGroupRuleSettings()
39 def test_empty_config(self):
40 with self.assertRaises(Exception):
41 SecurityGroupRuleSettings(**dict())
43 def test_name_only(self):
44 with self.assertRaises(Exception):
45 SecurityGroupRuleSettings(sec_grp_name='foo')
47 def test_config_with_name_only(self):
48 with self.assertRaises(Exception):
49 SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
51 def test_name_and_direction(self):
52 settings = SecurityGroupRuleSettings(sec_grp_name='foo',
53 direction=Direction.ingress)
54 self.assertEqual('foo', settings.sec_grp_name)
55 self.assertEqual(Direction.ingress, settings.direction)
57 def test_config_name_and_direction(self):
58 settings = SecurityGroupRuleSettings(
59 **{'sec_grp_name': 'foo', 'direction': 'ingress'})
60 self.assertEqual('foo', settings.sec_grp_name)
61 self.assertEqual(Direction.ingress, settings.direction)
64 settings = SecurityGroupRuleSettings(
65 sec_grp_name='foo', description='fubar',
66 direction=Direction.egress, remote_group_id='rgi',
67 protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
69 remote_ip_prefix='prfx')
70 self.assertEqual('foo', settings.sec_grp_name)
71 self.assertEqual('fubar', settings.description)
72 self.assertEqual(Direction.egress, settings.direction)
73 self.assertEqual('rgi', settings.remote_group_id)
74 self.assertEqual(Protocol.icmp, settings.protocol)
75 self.assertEqual(Ethertype.IPv6, settings.ethertype)
76 self.assertEqual(1, settings.port_range_min)
77 self.assertEqual(2, settings.port_range_max)
78 self.assertEqual('prfx', settings.remote_ip_prefix)
80 def test_config_all(self):
81 settings = SecurityGroupRuleSettings(
82 **{'sec_grp_name': 'foo',
83 'description': 'fubar',
84 'direction': 'egress',
85 'remote_group_id': 'rgi',
90 'remote_ip_prefix': 'prfx'})
91 self.assertEqual('foo', settings.sec_grp_name)
92 self.assertEqual('fubar', settings.description)
93 self.assertEqual(Direction.egress, settings.direction)
94 self.assertEqual('rgi', settings.remote_group_id)
95 self.assertEqual(Protocol.tcp, settings.protocol)
96 self.assertEqual(Ethertype.IPv6, settings.ethertype)
97 self.assertEqual(1, settings.port_range_min)
98 self.assertEqual(2, settings.port_range_max)
99 self.assertEqual('prfx', settings.remote_ip_prefix)
102 class SecurityGroupSettingsUnitTests(unittest.TestCase):
104 Tests the construction of the SecurityGroupSettings class
107 def test_no_params(self):
108 with self.assertRaises(Exception):
109 SecurityGroupSettings()
111 def test_empty_config(self):
112 with self.assertRaises(Exception):
113 SecurityGroupSettings(**dict())
115 def test_name_only(self):
116 settings = SecurityGroupSettings(name='foo')
117 self.assertEqual('foo', settings.name)
119 def test_config_with_name_only(self):
120 settings = SecurityGroupSettings(**{'name': 'foo'})
121 self.assertEqual('foo', settings.name)
123 def test_invalid_rule(self):
124 rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
125 direction=Direction.ingress)
126 with self.assertRaises(Exception):
127 SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
130 rule_settings = list()
131 rule_settings.append(SecurityGroupRuleSettings(
132 sec_grp_name='bar', direction=Direction.egress))
133 rule_settings.append(SecurityGroupRuleSettings(
134 sec_grp_name='bar', direction=Direction.ingress))
135 settings = SecurityGroupSettings(
136 name='bar', description='fubar', project_name='foo',
137 rule_settings=rule_settings)
139 self.assertEqual('bar', settings.name)
140 self.assertEqual('fubar', settings.description)
141 self.assertEqual('foo', settings.project_name)
142 self.assertEqual(rule_settings[0], settings.rule_settings[0])
143 self.assertEqual(rule_settings[1], settings.rule_settings[1])
145 def test_config_all(self):
146 settings = SecurityGroupSettings(
148 'description': 'fubar',
149 'project_name': 'foo',
151 {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
153 self.assertEqual('bar', settings.name)
154 self.assertEqual('fubar', settings.description)
155 self.assertEqual('foo', settings.project_name)
156 self.assertEqual(1, len(settings.rule_settings))
157 self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
158 self.assertEqual(Direction.ingress,
159 settings.rule_settings[0].direction)
162 class CreateSecurityGroupTests(OSIntegrationTestCase):
164 Test for the CreateSecurityGroup class defined in create_security_group.py
169 Instantiates the CreateSecurityGroup object that is responsible for
170 downloading and creating an OS image file within OpenStack
172 super(self.__class__, self).__start__()
174 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
175 self.sec_grp_name = guid + 'name'
176 self.neutron = neutron_utils.neutron_client(self.os_creds)
178 # Initialize for cleanup
179 self.sec_grp_creator = None
183 Cleans the image and downloaded image file
185 if self.sec_grp_creator:
186 self.sec_grp_creator.clean()
188 super(self.__class__, self).__clean__()
190 def test_create_group_without_rules(self):
192 Tests the creation of an OpenStack Security Group without custom rules.
195 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
196 description='hello group')
197 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
198 self.os_creds, sec_grp_settings)
199 self.sec_grp_creator.create()
201 sec_grp = neutron_utils.get_security_group(self.neutron,
203 self.assertIsNotNone(sec_grp)
205 validation_utils.objects_equivalent(
206 self.sec_grp_creator.get_security_group(), sec_grp)
207 rules = neutron_utils.get_rules_by_security_group(
208 self.neutron, self.sec_grp_creator.get_security_group())
209 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
210 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
213 def test_create_delete_group(self):
215 Tests the creation of an OpenStack Security Group without custom rules.
218 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
219 description='hello group')
220 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
221 self.os_creds, sec_grp_settings)
222 created_sec_grp = self.sec_grp_creator.create()
223 self.assertIsNotNone(created_sec_grp)
225 neutron_utils.delete_security_group(self.neutron, created_sec_grp)
226 self.assertIsNone(neutron_utils.get_security_group(
227 self.neutron, self.sec_grp_creator.sec_grp_settings.name))
229 self.sec_grp_creator.clean()
231 def test_create_group_with_one_simple_rule(self):
233 Tests the creation of an OpenStack Security Group with one simple
237 sec_grp_rule_settings = list()
238 sec_grp_rule_settings.append(
239 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
240 direction=Direction.ingress))
241 sec_grp_settings = SecurityGroupSettings(
242 name=self.sec_grp_name, description='hello group',
243 rule_settings=sec_grp_rule_settings)
244 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
245 self.os_creds, sec_grp_settings)
246 self.sec_grp_creator.create()
248 sec_grp = neutron_utils.get_security_group(self.neutron,
250 validation_utils.objects_equivalent(
251 self.sec_grp_creator.get_security_group(), sec_grp)
252 rules = neutron_utils.get_rules_by_security_group(
253 self.neutron, self.sec_grp_creator.get_security_group())
254 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
255 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
258 def test_create_group_with_several_rules(self):
260 Tests the creation of an OpenStack Security Group with one simple
264 sec_grp_rule_settings = list()
265 sec_grp_rule_settings.append(
266 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
267 direction=Direction.ingress))
268 sec_grp_rule_settings.append(
269 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
270 direction=Direction.egress,
271 protocol=Protocol.udp,
272 ethertype=Ethertype.IPv6))
273 sec_grp_rule_settings.append(
274 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
275 direction=Direction.egress,
276 protocol=Protocol.udp,
277 ethertype=Ethertype.IPv4,
280 sec_grp_settings = SecurityGroupSettings(
281 name=self.sec_grp_name, description='hello group',
282 rule_settings=sec_grp_rule_settings)
283 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
284 self.os_creds, sec_grp_settings)
285 self.sec_grp_creator.create()
287 sec_grp = neutron_utils.get_security_group(self.neutron,
289 validation_utils.objects_equivalent(
290 self.sec_grp_creator.get_security_group(), sec_grp)
291 rules = neutron_utils.get_rules_by_security_group(
292 self.neutron, self.sec_grp_creator.get_security_group())
293 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
294 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
297 def test_add_rule(self):
299 Tests the creation of an OpenStack Security Group with one simple
300 custom rule then adds one after creation.
303 sec_grp_rule_settings = list()
304 sec_grp_rule_settings.append(
305 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
306 direction=Direction.ingress))
307 sec_grp_settings = SecurityGroupSettings(
308 name=self.sec_grp_name, description='hello group',
309 rule_settings=sec_grp_rule_settings)
310 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
311 self.os_creds, sec_grp_settings)
312 self.sec_grp_creator.create()
314 sec_grp = neutron_utils.get_security_group(self.neutron,
316 validation_utils.objects_equivalent(
317 self.sec_grp_creator.get_security_group(), sec_grp)
318 rules = neutron_utils.get_rules_by_security_group(
319 self.neutron, self.sec_grp_creator.get_security_group())
320 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
321 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
324 self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
325 sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
326 direction=Direction.egress, protocol=Protocol.icmp))
327 rules2 = neutron_utils.get_rules_by_security_group(
328 self.neutron, self.sec_grp_creator.get_security_group())
329 self.assertEqual(len(rules) + 1, len(rules2))
331 def test_remove_rule_by_id(self):
333 Tests the creation of an OpenStack Security Group with two simple
334 custom rules then removes one by the rule ID.
337 sec_grp_rule_settings = list()
338 sec_grp_rule_settings.append(
339 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
340 direction=Direction.ingress))
341 sec_grp_rule_settings.append(
342 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
343 direction=Direction.egress,
344 protocol=Protocol.udp,
345 ethertype=Ethertype.IPv6))
346 sec_grp_rule_settings.append(
347 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
348 direction=Direction.egress,
349 protocol=Protocol.udp,
350 ethertype=Ethertype.IPv4,
353 sec_grp_settings = SecurityGroupSettings(
354 name=self.sec_grp_name, description='hello group',
355 rule_settings=sec_grp_rule_settings)
356 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
357 self.os_creds, sec_grp_settings)
358 self.sec_grp_creator.create()
360 sec_grp = neutron_utils.get_security_group(self.neutron,
362 validation_utils.objects_equivalent(
363 self.sec_grp_creator.get_security_group(), sec_grp)
364 rules = neutron_utils.get_rules_by_security_group(
365 self.neutron, self.sec_grp_creator.get_security_group())
366 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
367 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
370 self.sec_grp_creator.remove_rule(
372 rules_after_del = neutron_utils.get_rules_by_security_group(
374 self.sec_grp_creator.get_security_group())
375 self.assertEqual(len(rules) - 1, len(rules_after_del))
377 def test_remove_rule_by_setting(self):
379 Tests the creation of an OpenStack Security Group with two simple
380 custom rules then removes one by the rule setting object
383 sec_grp_rule_settings = list()
384 sec_grp_rule_settings.append(
385 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
386 direction=Direction.ingress))
387 sec_grp_rule_settings.append(
388 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
389 direction=Direction.egress,
390 protocol=Protocol.udp,
391 ethertype=Ethertype.IPv6))
392 sec_grp_rule_settings.append(
393 SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
394 direction=Direction.egress,
395 protocol=Protocol.udp,
396 ethertype=Ethertype.IPv4,
399 sec_grp_settings = SecurityGroupSettings(
400 name=self.sec_grp_name, description='hello group',
401 rule_settings=sec_grp_rule_settings)
402 self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
403 self.os_creds, sec_grp_settings)
404 self.sec_grp_creator.create()
406 sec_grp = neutron_utils.get_security_group(self.neutron,
408 validation_utils.objects_equivalent(
409 self.sec_grp_creator.get_security_group(), sec_grp)
410 rules = neutron_utils.get_rules_by_security_group(
411 self.neutron, self.sec_grp_creator.get_security_group())
412 self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
413 validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
416 self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
417 rules_after_del = neutron_utils.get_rules_by_security_group(
419 self.sec_grp_creator.get_security_group())
420 self.assertEqual(len(rules) - 1, len(rules_after_del))
422 # TODO - Add more tests with different rules. Rule creation parameters can be