Added support for Glance v2
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from snaps.openstack.utils import keystone_utils
18 from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction
19 from snaps.openstack.tests import openstack_tests
20 from snaps.openstack.utils import neutron_utils
21 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, PortSettings
22 from snaps.openstack import create_router
23 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
24 from snaps.openstack.tests import validation_utils
25
26 __author__ = 'spisarski'
27
28 ip_1 = '10.55.1.100'
29 ip_2 = '10.55.1.200'
30
31
32 class NeutronSmokeTests(OSComponentTestCase):
33     """
34     Tests to ensure that the neutron client can communicate with the cloud
35     """
36
37     def test_neutron_connect_success(self):
38         """
39         Tests to ensure that the proper credentials can connect.
40         """
41         neutron = neutron_utils.neutron_client(self.os_creds)
42
43         networks = neutron.list_networks()
44
45         found = False
46         networks = networks.get('networks')
47         for network in networks:
48             if network.get('name') == self.ext_net_name:
49                 found = True
50         self.assertTrue(found)
51
52     def test_neutron_connect_fail(self):
53         """
54         Tests to ensure that the improper credentials cannot connect.
55         """
56         from snaps.openstack.os_credentials import OSCreds
57
58         with self.assertRaises(Exception):
59             neutron = neutron_utils.neutron_client(
60                 OSCreds(username='user', password='pass', auth_url='url', project_name='project'))
61             neutron.list_networks()
62
63     def test_retrieve_ext_network_name(self):
64         """
65         Tests the neutron_utils.get_external_network_names to ensure the configured self.ext_net_name is contained
66         within the returned list
67         :return:
68         """
69         neutron = neutron_utils.neutron_client(self.os_creds)
70         ext_networks = neutron_utils.get_external_networks(neutron)
71         found = False
72         for network in ext_networks:
73             if network['network']['name'] == self.ext_net_name:
74                 found = True
75                 break
76         self.assertTrue(found)
77
78
79 class NeutronUtilsNetworkTests(OSComponentTestCase):
80     """
81     Test for creating networks via neutron_utils.py
82     """
83
84     def setUp(self):
85         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
86         self.port_name = str(guid) + '-port'
87         self.neutron = neutron_utils.neutron_client(self.os_creds)
88         self.network = None
89         self.net_config = openstack_tests.get_pub_net_config(net_name=guid + '-pub-net')
90
91     def tearDown(self):
92         """
93         Cleans the remote OpenStack objects
94         """
95         if self.network:
96             neutron_utils.delete_network(self.neutron, self.network)
97             validate_network(self.neutron, self.network['network']['name'], False)
98
99     def test_create_network(self):
100         """
101         Tests the neutron_utils.create_neutron_net() function
102         """
103         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
104         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
105         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
106
107     def test_create_network_empty_name(self):
108         """
109         Tests the neutron_utils.create_neutron_net() function with an empty network name
110         """
111         with self.assertRaises(Exception):
112             self.network = neutron_utils.create_network(self.neutron, NetworkSettings(name=''))
113
114     def test_create_network_null_name(self):
115         """
116         Tests the neutron_utils.create_neutron_net() function when the network name is None
117         """
118         with self.assertRaises(Exception):
119             self.network = neutron_utils.create_network(self.neutron, NetworkSettings())
120
121
122 class NeutronUtilsSubnetTests(OSComponentTestCase):
123     """
124     Test for creating networks with subnets via neutron_utils.py
125     """
126
127     def setUp(self):
128         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
129         self.port_name = str(guid) + '-port'
130         self.neutron = neutron_utils.neutron_client(self.os_creds)
131         self.network = None
132         self.subnet = None
133         self.net_config = openstack_tests.get_pub_net_config(
134             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', external_net=self.ext_net_name)
135
136     def tearDown(self):
137         """
138         Cleans the remote OpenStack objects
139         """
140         if self.subnet:
141             neutron_utils.delete_subnet(self.neutron, self.subnet)
142             validate_subnet(self.neutron, self.subnet.get('name'),
143                             self.net_config.network_settings.subnet_settings[0].cidr, False)
144
145         if self.network:
146             neutron_utils.delete_network(self.neutron, self.network)
147             validate_network(self.neutron, self.network['network']['name'], False)
148
149     def test_create_subnet(self):
150         """
151         Tests the neutron_utils.create_neutron_net() function
152         """
153         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
154         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
155         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
156
157         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
158                                                   self.os_creds, network=self.network)
159         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
160                         self.net_config.network_settings.subnet_settings[0].cidr, True)
161
162     def test_create_subnet_null_name(self):
163         """
164         Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet name is None
165         """
166         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
167         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
168         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
169
170         with self.assertRaises(Exception):
171             SubnetSettings(cidr=self.net_config.subnet_cidr)
172
173     def test_create_subnet_empty_name(self):
174         """
175         Tests the neutron_utils.create_neutron_net() function with an empty name
176         """
177         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
178         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
179         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
180
181         neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
182                                     self.os_creds, network=self.network)
183         validate_subnet(self.neutron, '', self.net_config.network_settings.subnet_settings[0].cidr, True)
184
185     def test_create_subnet_null_cidr(self):
186         """
187         Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is None
188         """
189         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
190         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
191         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
192
193         with self.assertRaises(Exception):
194             sub_sets = SubnetSettings(cidr=None, name=self.net_config.subnet_name)
195             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
196
197     def test_create_subnet_empty_cidr(self):
198         """
199         Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is empty
200         """
201         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
202         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
203         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
204
205         with self.assertRaises(Exception):
206             sub_sets = SubnetSettings(cidr='', name=self.net_config.subnet_name)
207             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
208
209
210 class NeutronUtilsRouterTests(OSComponentTestCase):
211     """
212     Test for creating routers via neutron_utils.py
213     """
214
215     def setUp(self):
216         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
217         self.port_name = str(guid) + '-port'
218         self.neutron = neutron_utils.neutron_client(self.os_creds)
219         self.network = None
220         self.subnet = None
221         self.port = None
222         self.router = None
223         self.interface_router = None
224         self.net_config = openstack_tests.get_pub_net_config(
225             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
226             router_name=guid + '-pub-router', external_net=self.ext_net_name)
227
228     def tearDown(self):
229         """
230         Cleans the remote OpenStack objects
231         """
232         if self.interface_router:
233             neutron_utils.remove_interface_router(self.neutron, self.router, self.subnet)
234
235         if self.router:
236             neutron_utils.delete_router(self.neutron, self.router)
237             validate_router(self.neutron, self.router.get('name'), False)
238
239         if self.port:
240             neutron_utils.delete_port(self.neutron, self.port)
241
242         if self.subnet:
243             neutron_utils.delete_subnet(self.neutron, self.subnet)
244             validate_subnet(self.neutron, self.subnet.get('name'),
245                             self.net_config.network_settings.subnet_settings[0].cidr, False)
246
247         if self.network:
248             neutron_utils.delete_network(self.neutron, self.network)
249             validate_network(self.neutron, self.network['network']['name'], False)
250
251     def test_create_router_simple(self):
252         """
253         Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
254         """
255         self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
256         validate_router(self.neutron, self.net_config.router_settings.name, True)
257
258     def test_create_router_with_public_interface(self):
259         """
260         Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
261         """
262         self.net_config = openstack_tests.OSNetworkConfig(
263             self.net_config.network_settings.name,
264             self.net_config.network_settings.subnet_settings[0].name,
265             self.net_config.network_settings.subnet_settings[0].cidr, self.net_config.router_settings.name,
266             self.ext_net_name)
267         self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
268         validate_router(self.neutron, self.net_config.router_settings.name, True)
269         # TODO - Add validation that the router gatway has been set
270
271     def test_create_router_empty_name(self):
272         """
273         Tests the neutron_utils.create_neutron_net() function
274         """
275         with self.assertRaises(Exception):
276             this_router_settings = create_router.RouterSettings(name='')
277             self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
278
279     def test_create_router_null_name(self):
280         """
281         Tests the neutron_utils.create_neutron_subnet() function when the subnet CIDR value is None
282         """
283         with self.assertRaises(Exception):
284             this_router_settings = create_router.RouterSettings()
285             self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
286             validate_router(self.neutron, None, True)
287
288     def test_add_interface_router(self):
289         """
290         Tests the neutron_utils.add_interface_router() function
291         """
292         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
293         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
294         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
295
296         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
297                                                   self.os_creds, self.network)
298         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
299                         self.net_config.network_settings.subnet_settings[0].cidr, True)
300
301         self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
302         validate_router(self.neutron, self.net_config.router_settings.name, True)
303
304         self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
305         validate_interface_router(self.interface_router, self.router, self.subnet)
306
307     def test_add_interface_router_null_router(self):
308         """
309         Tests the neutron_utils.add_interface_router() function for an Exception when the router value is None
310         """
311         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
312         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
313         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
314
315         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
316                                                   self.os_creds, self.network)
317         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
318                         self.net_config.network_settings.subnet_settings[0].cidr, True)
319
320         with self.assertRaises(Exception):
321             self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
322
323     def test_add_interface_router_null_subnet(self):
324         """
325         Tests the neutron_utils.add_interface_router() function for an Exception when the subnet value is None
326         """
327         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
328         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
329         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
330
331         self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
332         validate_router(self.neutron, self.net_config.router_settings.name, True)
333
334         with self.assertRaises(Exception):
335             self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
336
337     def test_create_port(self):
338         """
339         Tests the neutron_utils.create_port() function
340         """
341         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
342         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
343         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
344
345         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
346                                                   self.os_creds, self.network)
347         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
348                         self.net_config.network_settings.subnet_settings[0].cidr, True)
349
350         self.port = neutron_utils.create_port(
351             self.neutron, self.os_creds, PortSettings(
352                 name=self.port_name,
353                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}],
354                 network_name=self.net_config.network_settings.name))
355         validate_port(self.neutron, self.port, self.port_name)
356
357     def test_create_port_empty_name(self):
358         """
359         Tests the neutron_utils.create_port() function
360         """
361         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
362         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
363         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
364
365         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
366                                                   self.os_creds, self.network)
367         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
368                         self.net_config.network_settings.subnet_settings[0].cidr, True)
369
370         self.port = neutron_utils.create_port(
371             self.neutron, self.os_creds, PortSettings(
372                 name=self.port_name, network_name=self.net_config.network_settings.name,
373                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}]))
374         validate_port(self.neutron, self.port, self.port_name)
375
376     def test_create_port_null_name(self):
377         """
378         Tests the neutron_utils.create_port() function for an Exception when the port name value is None
379         """
380         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
381         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
382         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
383
384         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
385                                                   self.os_creds, self.network)
386         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
387                         self.net_config.network_settings.subnet_settings[0].cidr, True)
388
389         with self.assertRaises(Exception):
390             self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
391                 network_name=self.net_config.network_settings.name,
392                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
393
394     def test_create_port_null_network_object(self):
395         """
396         Tests the neutron_utils.create_port() function for an Exception when the network object is None
397         """
398         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
399         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
400         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
401
402         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
403                                                   self.os_creds, self.network)
404         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
405                         self.net_config.network_settings.subnet_settings[0].cidr, True)
406
407         with self.assertRaises(Exception):
408             self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
409                 self.neutron, self.port_name, self.net_config.network_settings.name,
410                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
411
412     def test_create_port_null_ip(self):
413         """
414         Tests the neutron_utils.create_port() function for an Exception when the IP value is None
415         """
416         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
417         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
418         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
419
420         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
421                                                   self.os_creds, self.network)
422         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
423                         self.net_config.network_settings.subnet_settings[0].cidr, True)
424
425         with self.assertRaises(Exception):
426             self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
427                 name=self.port_name, network_name=self.net_config.network_settings.name,
428                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': None}]))
429
430     def test_create_port_invalid_ip(self):
431         """
432         Tests the neutron_utils.create_port() function for an Exception when the IP value is None
433         """
434         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
435         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
436         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
437
438         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
439                                                   self.os_creds, self.network)
440         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
441                         self.net_config.network_settings.subnet_settings[0].cidr, True)
442
443         with self.assertRaises(Exception):
444             self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
445                 name=self.port_name, network_name=self.net_config.network_settings.name,
446                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': 'foo'}]))
447
448     def test_create_port_invalid_ip_to_subnet(self):
449         """
450         Tests the neutron_utils.create_port() function for an Exception when the IP value is None
451         """
452         self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
453         self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
454         self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
455
456         self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
457                                                   self.os_creds, self.network)
458         validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
459                         self.net_config.network_settings.subnet_settings[0].cidr, True)
460
461         with self.assertRaises(Exception):
462             self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
463                 name=self.port_name, network_name=self.net_config.network_settings.name,
464                 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name,
465                           'ip': '10.197.123.100'}]))
466
467
468 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
469     """
470     Test for creating security groups via neutron_utils.py
471     """
472
473     def setUp(self):
474         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
475         self.sec_grp_name = guid + 'name'
476
477         self.security_group = None
478         self.security_group_rules = list()
479         self.neutron = neutron_utils.neutron_client(self.os_creds)
480         self.keystone = keystone_utils.keystone_client(self.os_creds)
481
482     def tearDown(self):
483         """
484         Cleans the remote OpenStack objects
485         """
486         for rule in self.security_group_rules:
487             neutron_utils.delete_security_group_rule(self.neutron, rule)
488
489         if self.security_group:
490             neutron_utils.delete_security_group(self.neutron, self.security_group)
491
492     def test_create_delete_simple_sec_grp(self):
493         """
494         Tests the neutron_utils.create_security_group() function
495         """
496         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
497         self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
498
499         self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name'])
500
501         sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
502         self.assertIsNotNone(sec_grp_get)
503         self.assertTrue(validation_utils.objects_equivalent(
504             self.security_group['security_group'], sec_grp_get['security_group']))
505
506         neutron_utils.delete_security_group(self.neutron, self.security_group)
507         sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
508         self.assertIsNone(sec_grp_get)
509         self.security_group = None
510
511     def test_create_sec_grp_no_name(self):
512         """
513         Tests the SecurityGroupSettings constructor and neutron_utils.create_security_group() function to ensure
514         that attempting to create a security group without a name will raise an exception
515         """
516         with self.assertRaises(Exception):
517             sec_grp_settings = SecurityGroupSettings()
518             self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
519
520     def test_create_sec_grp_no_rules(self):
521         """
522         Tests the neutron_utils.create_security_group() function
523         """
524         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
525         self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
526
527         self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name'])
528         self.assertTrue(sec_grp_settings.description, self.security_group['security_group']['description'])
529
530         sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
531         self.assertIsNotNone(sec_grp_get)
532         self.assertTrue(validation_utils.objects_equivalent(
533             self.security_group['security_group'], sec_grp_get['security_group']))
534
535     def test_create_sec_grp_one_rule(self):
536         """
537         Tests the neutron_utils.create_security_group() function
538         """
539
540         sec_grp_rule_settings = SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
541         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
542                                                  rule_settings=[sec_grp_rule_settings])
543
544         self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
545         free_rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_group)
546         for free_rule in free_rules:
547             self.security_group_rules.append(free_rule)
548
549         self.security_group_rules.append(
550             neutron_utils.create_security_group_rule(self.neutron, sec_grp_settings.rule_settings[0]))
551
552         # Refresh object so it is populated with the newly added rule
553         self.security_group = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
554
555         rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_group)
556
557         self.assertTrue(validation_utils.objects_equivalent(self.security_group_rules, rules))
558
559         self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name'])
560         self.assertTrue(sec_grp_settings.description, self.security_group['security_group']['description'])
561
562         sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
563         self.assertIsNotNone(sec_grp_get)
564         self.assertTrue(validation_utils.objects_equivalent(
565             self.security_group['security_group'], sec_grp_get['security_group']))
566
567
568 """
569 Validation routines
570 """
571
572
573 def validate_network(neutron, name, exists):
574     """
575     Returns true if a network for a given name DOES NOT exist if the exists parameter is false conversely true.
576     Returns false if a network for a given name DOES exist if the exists parameter is true conversely false.
577     :param neutron: The neutron client
578     :param name: The expected network name
579     :param exists: Whether or not the network name should exist or not
580     :return: True/False
581     """
582     network = neutron_utils.get_network(neutron, name)
583     if exists and network:
584         return True
585     if not exists and not network:
586         return True
587     return False
588
589
590 def validate_subnet(neutron, name, cidr, exists):
591     """
592     Returns true if a subnet for a given name DOES NOT exist if the exists parameter is false conversely true.
593     Returns false if a subnet for a given name DOES exist if the exists parameter is true conversely false.
594     :param neutron: The neutron client
595     :param name: The expected subnet name
596     :param cidr: The expected CIDR value
597     :param exists: Whether or not the network name should exist or not
598     :return: True/False
599     """
600     subnet = neutron_utils.get_subnet_by_name(neutron, name)
601     if exists and subnet:
602         return subnet.get('cidr') == cidr
603     if not exists and not subnet:
604         return True
605     return False
606
607
608 def validate_router(neutron, name, exists):
609     """
610     Returns true if a router for a given name DOES NOT exist if the exists parameter is false conversely true.
611     Returns false if a router for a given name DOES exist if the exists parameter is true conversely false.
612     :param neutron: The neutron client
613     :param name: The expected router name
614     :param exists: Whether or not the network name should exist or not
615     :return: True/False
616     """
617     router = neutron_utils.get_router_by_name(neutron, name)
618     if exists and router:
619         return True
620     return False
621
622
623 def validate_interface_router(interface_router, router, subnet):
624     """
625     Returns true if the router ID & subnet ID have been properly included into the interface router object
626     :param interface_router: the object to validate
627     :param router: to validate against the interface_router
628     :param subnet: to validate against the interface_router
629     :return: True if both IDs match else False
630     """
631     subnet_id = interface_router.get('subnet_id')
632     router_id = interface_router.get('port_id')
633
634     return subnet.get('id') == subnet_id and router.get('id') == router_id
635
636
637 def validate_port(neutron, port_obj, this_port_name):
638     """
639     Returns true if a port for a given name DOES NOT exist if the exists parameter is false conversely true.
640     Returns false if a port for a given name DOES exist if the exists parameter is true conversely false.
641     :param neutron: The neutron client
642     :param port_obj: The port object to lookup
643     :param this_port_name: The expected router name
644     :return: True/False
645     """
646     ports = neutron.list_ports()
647     for port, port_insts in ports.iteritems():
648         for inst in port_insts:
649             if inst['id'] == port_obj['port']['id']:
650                 return inst['name'] == this_port_name
651     return False