Merge "Added custom security group with ICMP and SSH rules."
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from snaps.openstack.utils import keystone_utils
18 from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction
19 from snaps.openstack.tests import openstack_tests
20 from snaps.openstack.utils import neutron_utils
21 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, PortSettings
22 from snaps.openstack import create_router
23 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
24 from snaps.openstack.tests import validation_utils
25
26 __author__ = 'spisarski'
27
28 ip_1 = '10.55.1.100'
29 ip_2 = '10.55.1.200'
30
31
32 class NeutronSmokeTests(OSComponentTestCase):
33     """
34     Tests to ensure that the neutron client can communicate with the cloud
35     """
36
37     def test_neutron_connect_success(self):
38         """
39         Tests to ensure that the proper credentials can connect.
40         """
41         neutron = neutron_utils.neutron_client(self.os_creds)
42
43         networks = neutron.list_networks()
44
45         found = False
46         networks = networks.get('networks')
47         for network in networks:
48             if network.get('name') == self.ext_net_name:
49                 found = True
50         self.assertTrue(found)
51
52     def test_neutron_connect_fail(self):
53         """
54         Tests to ensure that the improper credentials cannot connect.
55         """
56         from snaps.openstack.os_credentials import OSCreds
57
58         with self.assertRaises(Exception):
59             neutron = neutron_utils.neutron_client(
60                 OSCreds(username='user', password='pass', auth_url='url', project_name='project'))
61             neutron.list_networks()
62
63     def test_retrieve_ext_network_name(self):
64         """
65         Tests the neutron_utils.get_external_network_names to ensure the configured self.ext_net_name is contained
66         within the returned list
67         :return:
68         """
69         neutron = neutron_utils.neutron_client(self.os_creds)
70         ext_networks = neutron_utils.get_external_networks(neutron)
71         found = False
72         for network in ext_networks:
73             if network['network']['name'] == self.ext_net_name:
74                 found = True
75                 break
76         self.assertTrue(found)
77
78
79 class NeutronUtilsNetworkTests(OSComponentTestCase):
80     """
81     Test for creating networks via neutron_utils.py
82     """
83
84     def setUp(self):
85         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
86         self.port_name = str(guid) + '-port'
87         self.neutron = neutron_utils.neutron_client(self.os_creds)
88         self.network = None
89         self.net_config = openstack_tests.get_pub_net_config(net_name=guid + '-pub-net')
90
91     def tearDown(self):
92         """
93         Cleans the remote OpenStack objects
94         """
95         if self.network:
96             neutron_utils.delete_network(self.neutron, self.network)
97             validate_network(self.neutron, self.network['network']['name'], False)
98
99     def test_create_network(self):
100         """
101         Tests the neutron_utils.create_neutron_net() function
102         """
103         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
104         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
105         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
106
107     def test_create_network_empty_name(self):
108         """
109         Tests the neutron_utils.create_neutron_net() function with an empty network name
110         """
111         with self.assertRaises(Exception):
112             self.network = neutron_utils.create_network(self.neutron, self.os_creds,
113                                                         network_settings=NetworkSettings(name=''))
114
115     def test_create_network_null_name(self):
116         """
117         Tests the neutron_utils.create_neutron_net() function when the network name is None
118         """
119         with self.assertRaises(Exception):
120             self.network = neutron_utils.create_network(self.neutron, self.os_creds,
121                                                         network_settings=NetworkSettings())
122
123
124 class NeutronUtilsSubnetTests(OSComponentTestCase):
125     """
126     Test for creating networks with subnets via neutron_utils.py
127     """
128
129     def setUp(self):
130         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
131         self.port_name = str(guid) + '-port'
132         self.neutron = neutron_utils.neutron_client(self.os_creds)
133         self.network = None
134         self.subnet = None
135         self.net_config = openstack_tests.get_pub_net_config(
136             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', external_net=self.ext_net_name)
137
138     def tearDown(self):
139         """
140         Cleans the remote OpenStack objects
141         """
142         if self.subnet:
143             neutron_utils.delete_subnet(self.neutron, self.subnet)
144             validate_subnet(self.neutron, self.subnet.get('name'),
145                             self.net_config.network_settings.subnet_settings[0].cidr, False)
146
147         if self.network:
148             neutron_utils.delete_network(self.neutron, self.network)
149             validate_network(self.neutron, self.network['network']['name'], False)
150
151     def test_create_subnet(self):
152         """
153         Tests the neutron_utils.create_neutron_net() function
154         """
155         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
156         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
157         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
158
159         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
160                                                   self.os_creds, network=self.network)
161         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
162                         self.net_config.network_settings.subnet_settings[0].cidr, True)
163
164     def test_create_subnet_null_name(self):
165         """
166         Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet name is None
167         """
168         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
169         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
170         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
171
172         with self.assertRaises(Exception):
173             SubnetSettings(cidr=self.net_config.subnet_cidr)
174
175     def test_create_subnet_empty_name(self):
176         """
177         Tests the neutron_utils.create_neutron_net() function with an empty name
178         """
179         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
180         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
181         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
182
183         neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
184                                     self.os_creds, network=self.network)
185         validate_subnet(self.neutron, '', self.net_config.network_settings.subnet_settings[0].cidr, True)
186
187     def test_create_subnet_null_cidr(self):
188         """
189         Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is None
190         """
191         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
192         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
193         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
194
195         with self.assertRaises(Exception):
196             sub_sets = SubnetSettings(cidr=None, name=self.net_config.subnet_name)
197             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
198
199     def test_create_subnet_empty_cidr(self):
200         """
201         Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is empty
202         """
203         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
204         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
205         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
206
207         with self.assertRaises(Exception):
208             sub_sets = SubnetSettings(cidr='', name=self.net_config.subnet_name)
209             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
210
211
212 class NeutronUtilsRouterTests(OSComponentTestCase):
213     """
214     Test for creating routers via neutron_utils.py
215     """
216
217     def setUp(self):
218         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
219         self.port_name = str(guid) + '-port'
220         self.neutron = neutron_utils.neutron_client(self.os_creds)
221         self.network = None
222         self.subnet = None
223         self.port = None
224         self.router = None
225         self.interface_router = None
226         self.net_config = openstack_tests.get_pub_net_config(
227             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
228             router_name=guid + '-pub-router', external_net=self.ext_net_name)
229
230     def tearDown(self):
231         """
232         Cleans the remote OpenStack objects
233         """
234         if self.interface_router:
235             neutron_utils.remove_interface_router(self.neutron, self.router, self.subnet)
236
237         if self.router:
238             neutron_utils.delete_router(self.neutron, self.router)
239             validate_router(self.neutron, self.router.get('name'), False)
240
241         if self.port:
242             neutron_utils.delete_port(self.neutron, self.port)
243
244         if self.subnet:
245             neutron_utils.delete_subnet(self.neutron, self.subnet)
246             validate_subnet(self.neutron, self.subnet.get('name'),
247                             self.net_config.network_settings.subnet_settings[0].cidr, False)
248
249         if self.network:
250             neutron_utils.delete_network(self.neutron, self.network)
251             validate_network(self.neutron, self.network['network']['name'], False)
252
253     def test_create_router_simple(self):
254         """
255         Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
256         """
257         self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
258         validate_router(self.neutron, self.net_config.router_settings.name, True)
259
260     def test_create_router_with_public_interface(self):
261         """
262         Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
263         """
264         self.net_config = openstack_tests.OSNetworkConfig(
265             self.net_config.network_settings.name,
266             self.net_config.network_settings.subnet_settings[0].name,
267             self.net_config.network_settings.subnet_settings[0].cidr, self.net_config.router_settings.name,
268             self.ext_net_name)
269         self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
270         validate_router(self.neutron, self.net_config.router_settings.name, True)
271         # TODO - Add validation that the router gatway has been set
272
273     def test_create_router_empty_name(self):
274         """
275         Tests the neutron_utils.create_neutron_net() function
276         """
277         with self.assertRaises(Exception):
278             this_router_settings = create_router.RouterSettings(name='')
279             self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
280
281     def test_create_router_null_name(self):
282         """
283         Tests the neutron_utils.create_neutron_subnet() function when the subnet CIDR value is None
284         """
285         with self.assertRaises(Exception):
286             this_router_settings = create_router.RouterSettings()
287             self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
288             validate_router(self.neutron, None, True)
289
290     def test_add_interface_router(self):
291         """
292         Tests the neutron_utils.add_interface_router() function
293         """
294         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
295         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
296         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
297
298         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
299                                                   self.os_creds, self.network)
300         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
301                         self.net_config.network_settings.subnet_settings[0].cidr, True)
302
303         self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
304         validate_router(self.neutron, self.net_config.router_settings.name, True)
305
306         self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
307         validate_interface_router(self.interface_router, self.router, self.subnet)
308
309     def test_add_interface_router_null_router(self):
310         """
311         Tests the neutron_utils.add_interface_router() function for an Exception when the router value is None
312         """
313         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
314         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
315         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
316
317         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
318                                                   self.os_creds, self.network)
319         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
320                         self.net_config.network_settings.subnet_settings[0].cidr, True)
321
322         with self.assertRaises(Exception):
323             self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
324
325     def test_add_interface_router_null_subnet(self):
326         """
327         Tests the neutron_utils.add_interface_router() function for an Exception when the subnet value is None
328         """
329         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
330         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
331         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
332
333         self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
334         validate_router(self.neutron, self.net_config.router_settings.name, True)
335
336         with self.assertRaises(Exception):
337             self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
338
339     def test_create_port(self):
340         """
341         Tests the neutron_utils.create_port() function
342         """
343         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
344         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
345         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
346
347         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
348                                                   self.os_creds, self.network)
349         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
350                         self.net_config.network_settings.subnet_settings[0].cidr, True)
351
352         self.port = neutron_utils.create_port(
353             self.neutron, self.os_creds, PortSettings(
354                 name=self.port_name,
355                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}],
356                 network_name=self.net_config.network_settings.name))
357         validate_port(self.neutron, self.port, self.port_name)
358
359     def test_create_port_empty_name(self):
360         """
361         Tests the neutron_utils.create_port() function
362         """
363         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
364         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
365         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
366
367         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
368                                                   self.os_creds, self.network)
369         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
370                         self.net_config.network_settings.subnet_settings[0].cidr, True)
371
372         self.port = neutron_utils.create_port(
373             self.neutron, self.os_creds, PortSettings(
374                 name=self.port_name, network_name=self.net_config.network_settings.name,
375                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}]))
376         validate_port(self.neutron, self.port, self.port_name)
377
378     def test_create_port_null_name(self):
379         """
380         Tests the neutron_utils.create_port() function for an Exception when the port name value is None
381         """
382         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
383         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
384         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
385
386         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
387                                                   self.os_creds, self.network)
388         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
389                         self.net_config.network_settings.subnet_settings[0].cidr, True)
390
391         with self.assertRaises(Exception):
392             self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
393                 network_name=self.net_config.network_settings.name,
394                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
395
396     def test_create_port_null_network_object(self):
397         """
398         Tests the neutron_utils.create_port() function for an Exception when the network object is None
399         """
400         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
401         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
402         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
403
404         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
405                                                   self.os_creds, self.network)
406         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
407                         self.net_config.network_settings.subnet_settings[0].cidr, True)
408
409         with self.assertRaises(Exception):
410             self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
411                 self.neutron, self.port_name, self.net_config.network_settings.name,
412                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
413
414     def test_create_port_null_ip(self):
415         """
416         Tests the neutron_utils.create_port() function for an Exception when the IP value is None
417         """
418         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
419         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
420         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
421
422         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
423                                                   self.os_creds, self.network)
424         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
425                         self.net_config.network_settings.subnet_settings[0].cidr, True)
426
427         with self.assertRaises(Exception):
428             self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
429                 name=self.port_name, network_name=self.net_config.network_settings.name,
430                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': None}]))
431
432     def test_create_port_invalid_ip(self):
433         """
434         Tests the neutron_utils.create_port() function for an Exception when the IP value is None
435         """
436         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
437         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
438         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
439
440         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
441                                                   self.os_creds, self.network)
442         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
443                         self.net_config.network_settings.subnet_settings[0].cidr, True)
444
445         with self.assertRaises(Exception):
446             self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
447                 name=self.port_name, network_name=self.net_config.network_settings.name,
448                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': 'foo'}]))
449
450     def test_create_port_invalid_ip_to_subnet(self):
451         """
452         Tests the neutron_utils.create_port() function for an Exception when the IP value is None
453         """
454         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
455         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
456         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
457
458         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
459                                                   self.os_creds, self.network)
460         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
461                         self.net_config.network_settings.subnet_settings[0].cidr, True)
462
463         with self.assertRaises(Exception):
464             self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
465                 name=self.port_name, network_name=self.net_config.network_settings.name,
466                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name,
467                           'ip': '10.197.123.100'}]))
468
469
470 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
471     """
472     Test for creating security groups via neutron_utils.py
473     """
474
475     def setUp(self):
476         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
477         self.sec_grp_name = guid + 'name'
478
479         self.security_groups = list()
480         self.security_group_rules = list()
481         self.neutron = neutron_utils.neutron_client(self.os_creds)
482         self.keystone = keystone_utils.keystone_client(self.os_creds)
483
484     def tearDown(self):
485         """
486         Cleans the remote OpenStack objects
487         """
488         for rule in self.security_group_rules:
489             neutron_utils.delete_security_group_rule(self.neutron, rule)
490
491         for security_group in self.security_groups:
492             try:
493                 neutron_utils.delete_security_group(self.neutron, security_group)
494             except:
495                 pass
496
497     def test_create_delete_simple_sec_grp(self):
498         """
499         Tests the neutron_utils.create_security_group() function
500         """
501         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
502         security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
503
504         self.assertTrue(sec_grp_settings.name, security_group['security_group']['name'])
505
506         sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
507         self.assertIsNotNone(sec_grp_get)
508         self.assertTrue(validation_utils.objects_equivalent(
509             security_group['security_group'], sec_grp_get['security_group']))
510
511         neutron_utils.delete_security_group(self.neutron, security_group)
512         sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
513         self.assertIsNone(sec_grp_get)
514
515     def test_create_sec_grp_no_name(self):
516         """
517         Tests the SecurityGroupSettings constructor and neutron_utils.create_security_group() function to ensure
518         that attempting to create a security group without a name will raise an exception
519         """
520         with self.assertRaises(Exception):
521             sec_grp_settings = SecurityGroupSettings()
522             self.security_groups.append(
523                 neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
524
525     def test_create_sec_grp_no_rules(self):
526         """
527         Tests the neutron_utils.create_security_group() function
528         """
529         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
530         self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
531
532         self.assertTrue(sec_grp_settings.name, self.security_groups[0]['security_group']['name'])
533         self.assertTrue(sec_grp_settings.description, self.security_groups[0]['security_group']['description'])
534
535         sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
536         self.assertIsNotNone(sec_grp_get)
537         self.assertTrue(validation_utils.objects_equivalent(
538             self.security_groups[0]['security_group'], sec_grp_get['security_group']))
539
540     def test_create_sec_grp_one_rule(self):
541         """
542         Tests the neutron_utils.create_security_group() function
543         """
544
545         sec_grp_rule_settings = SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
546         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
547                                                  rule_settings=[sec_grp_rule_settings])
548
549         self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
550         free_rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_groups[0])
551         for free_rule in free_rules:
552             self.security_group_rules.append(free_rule)
553
554         self.security_group_rules.append(
555             neutron_utils.create_security_group_rule(self.neutron, sec_grp_settings.rule_settings[0]))
556
557         # Refresh object so it is populated with the newly added rule
558         security_group = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
559
560         rules = neutron_utils.get_rules_by_security_group(self.neutron, security_group)
561
562         self.assertTrue(validation_utils.objects_equivalent(self.security_group_rules, rules))
563
564         self.assertTrue(sec_grp_settings.name, security_group['security_group']['name'])
565         self.assertTrue(sec_grp_settings.description, security_group['security_group']['description'])
566
567         sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
568         self.assertIsNotNone(sec_grp_get)
569         self.assertTrue(validation_utils.objects_equivalent(
570             security_group['security_group'], sec_grp_get['security_group']))
571
572     def test_get_sec_grp_by_id(self):
573         """
574         Tests the neutron_utils.create_security_group() function
575         """
576
577         self.security_groups.append(neutron_utils.create_security_group(
578             self.neutron, self.keystone,
579             SecurityGroupSettings(name=self.sec_grp_name + '-1', description='hello group')))
580         self.security_groups.append(neutron_utils.create_security_group(
581             self.neutron, self.keystone,
582             SecurityGroupSettings(name=self.sec_grp_name + '-2', description='hello group')))
583
584         sec_grp_1b = neutron_utils.get_security_group_by_id(
585             self.neutron, self.security_groups[0]['security_group']['id'])
586         sec_grp_2b = neutron_utils.get_security_group_by_id(
587             self.neutron, self.security_groups[1]['security_group']['id'])
588
589         self.assertEqual(self.security_groups[0]['security_group']['id'], sec_grp_1b['security_group']['id'])
590         self.assertEqual(self.security_groups[1]['security_group']['id'], sec_grp_2b['security_group']['id'])
591
592
593 """
594 Validation routines
595 """
596
597
598 def validate_network(neutron, name, exists):
599     """
600     Returns true if a network for a given name DOES NOT exist if the exists parameter is false conversely true.
601     Returns false if a network for a given name DOES exist if the exists parameter is true conversely false.
602     :param neutron: The neutron client
603     :param name: The expected network name
604     :param exists: Whether or not the network name should exist or not
605     :return: True/False
606     """
607     network = neutron_utils.get_network(neutron, name)
608     if exists and network:
609         return True
610     if not exists and not network:
611         return True
612     return False
613
614
615 def validate_subnet(neutron, name, cidr, exists):
616     """
617     Returns true if a subnet for a given name DOES NOT exist if the exists parameter is false conversely true.
618     Returns false if a subnet for a given name DOES exist if the exists parameter is true conversely false.
619     :param neutron: The neutron client
620     :param name: The expected subnet name
621     :param cidr: The expected CIDR value
622     :param exists: Whether or not the network name should exist or not
623     :return: True/False
624     """
625     subnet = neutron_utils.get_subnet_by_name(neutron, name)
626     if exists and subnet:
627         return subnet.get('cidr') == cidr
628     if not exists and not subnet:
629         return True
630     return False
631
632
633 def validate_router(neutron, name, exists):
634     """
635     Returns true if a router for a given name DOES NOT exist if the exists parameter is false conversely true.
636     Returns false if a router for a given name DOES exist if the exists parameter is true conversely false.
637     :param neutron: The neutron client
638     :param name: The expected router name
639     :param exists: Whether or not the network name should exist or not
640     :return: True/False
641     """
642     router = neutron_utils.get_router_by_name(neutron, name)
643     if exists and router:
644         return True
645     return False
646
647
648 def validate_interface_router(interface_router, router, subnet):
649     """
650     Returns true if the router ID & subnet ID have been properly included into the interface router object
651     :param interface_router: the object to validate
652     :param router: to validate against the interface_router
653     :param subnet: to validate against the interface_router
654     :return: True if both IDs match else False
655     """
656     subnet_id = interface_router.get('subnet_id')
657     router_id = interface_router.get('port_id')
658
659     return subnet.get('id') == subnet_id and router.get('id') == router_id
660
661
662 def validate_port(neutron, port_obj, this_port_name):
663     """
664     Returns true if a port for a given name DOES NOT exist if the exists parameter is false conversely true.
665     Returns false if a port for a given name DOES exist if the exists parameter is true conversely false.
666     :param neutron: The neutron client
667     :param port_obj: The port object to lookup
668     :param this_port_name: The expected router name
669     :return: True/False
670     """
671     ports = neutron.list_ports()
672     for port, port_insts in ports.items():
673         for inst in port_insts:
674             if inst['id'] == port_obj['port']['id']:
675                 return inst['name'] == this_port_name
676     return False