SNAPS Stack creators can now return SNAPS network creators.
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
19     PortSettings
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21     SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(self.os_creds)
92         self.network = None
93         self.net_config = openstack_tests.get_pub_net_config(
94             net_name=guid + '-pub-net')
95
96     def tearDown(self):
97         """
98         Cleans the remote OpenStack objects
99         """
100         if self.network:
101             neutron_utils.delete_network(self.neutron, self.network)
102
103     def test_create_network(self):
104         """
105         Tests the neutron_utils.create_neutron_net() function
106         """
107         self.network = neutron_utils.create_network(
108             self.neutron, self.os_creds, self.net_config.network_settings)
109         self.assertEqual(self.net_config.network_settings.name,
110                          self.network.name)
111         self.assertTrue(validate_network(
112             self.neutron, self.net_config.network_settings.name, True))
113
114     def test_create_network_empty_name(self):
115         """
116         Tests the neutron_utils.create_neutron_net() function with an empty
117         network name
118         """
119         with self.assertRaises(Exception):
120             self.network = neutron_utils.create_network(
121                 self.neutron, self.os_creds,
122                 network_settings=NetworkSettings(name=''))
123
124     def test_create_network_null_name(self):
125         """
126         Tests the neutron_utils.create_neutron_net() function when the network
127         name is None
128         """
129         with self.assertRaises(Exception):
130             self.network = neutron_utils.create_network(
131                 self.neutron, self.os_creds,
132                 network_settings=NetworkSettings())
133
134
135 class NeutronUtilsSubnetTests(OSComponentTestCase):
136     """
137     Test for creating networks with subnets via neutron_utils.py
138     """
139
140     def setUp(self):
141         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
142         self.port_name = str(guid) + '-port'
143         self.neutron = neutron_utils.neutron_client(self.os_creds)
144         self.network = None
145         self.subnet = None
146         self.net_config = openstack_tests.get_pub_net_config(
147             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
148             external_net=self.ext_net_name)
149
150     def tearDown(self):
151         """
152         Cleans the remote OpenStack objects
153         """
154         if self.subnet:
155             try:
156                 neutron_utils.delete_subnet(self.neutron, self.subnet)
157             except:
158                 pass
159         if self.network:
160             try:
161                 neutron_utils.delete_network(self.neutron, self.network)
162             except:
163                 pass
164
165     def test_create_subnet(self):
166         """
167         Tests the neutron_utils.create_neutron_net() function
168         """
169         self.network = neutron_utils.create_network(
170             self.neutron, self.os_creds, self.net_config.network_settings)
171         self.assertEqual(self.net_config.network_settings.name,
172                          self.network.name)
173         self.assertTrue(validate_network(
174             self.neutron, self.net_config.network_settings.name, True))
175
176         subnet_setting = self.net_config.network_settings.subnet_settings[0]
177         self.subnet = neutron_utils.create_subnet(
178             self.neutron, subnet_setting, self.os_creds, network=self.network)
179         self.assertTrue(validate_subnet(
180             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
181
182         subnet_query1 = neutron_utils.get_subnet(
183             self.neutron, subnet_name=subnet_setting.name)
184         self.assertEqual(self.subnet, subnet_query1)
185
186         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
187                                                              self.network)
188         self.assertIsNotNone(subnet_query2)
189         self.assertEqual(1, len(subnet_query2))
190         self.assertEqual(self.subnet, subnet_query2[0])
191
192     def test_create_subnet_null_name(self):
193         """
194         Tests the neutron_utils.create_neutron_subnet() function for an
195         Exception when the subnet name is None
196         """
197         self.network = neutron_utils.create_network(
198             self.neutron, self.os_creds, self.net_config.network_settings)
199         self.assertEqual(self.net_config.network_settings.name,
200                          self.network.name)
201         self.assertTrue(validate_network(
202             self.neutron, self.net_config.network_settings.name, True))
203
204         with self.assertRaises(Exception):
205             SubnetSettings(cidr=self.net_config.subnet_cidr)
206
207     def test_create_subnet_empty_name(self):
208         """
209         Tests the neutron_utils.create_neutron_net() function with an empty
210         name
211         """
212         self.network = neutron_utils.create_network(
213             self.neutron, self.os_creds, self.net_config.network_settings)
214         self.assertEqual(self.net_config.network_settings.name,
215                          self.network.name)
216         self.assertTrue(validate_network(
217             self.neutron, self.net_config.network_settings.name, True))
218
219         subnet_setting = self.net_config.network_settings.subnet_settings[0]
220         self.subnet = neutron_utils.create_subnet(
221             self.neutron, subnet_setting, self.os_creds, network=self.network)
222         self.assertTrue(validate_subnet(
223             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
224         self.assertFalse(validate_subnet(
225             self.neutron, '', subnet_setting.cidr, True))
226
227         subnet_query1 = neutron_utils.get_subnet(
228             self.neutron, subnet_name=subnet_setting.name)
229         self.assertEqual(self.subnet, subnet_query1)
230
231         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
232                                                              self.network)
233         self.assertIsNotNone(subnet_query2)
234         self.assertEqual(1, len(subnet_query2))
235         self.assertEqual(self.subnet, subnet_query2[0])
236
237     def test_create_subnet_null_cidr(self):
238         """
239         Tests the neutron_utils.create_neutron_subnet() function for an
240         Exception when the subnet CIDR value is None
241         """
242         self.network = neutron_utils.create_network(
243             self.neutron, self.os_creds, self.net_config.network_settings)
244         self.assertEqual(self.net_config.network_settings.name,
245                          self.network.name)
246         self.assertTrue(validate_network(
247             self.neutron, self.net_config.network_settings.name, True))
248
249         with self.assertRaises(Exception):
250             sub_sets = SubnetSettings(
251                 cidr=None, name=self.net_config.subnet_name)
252             neutron_utils.create_subnet(
253                 self.neutron, sub_sets, self.os_creds, network=self.network)
254
255     def test_create_subnet_empty_cidr(self):
256         """
257         Tests the neutron_utils.create_neutron_subnet() function for an
258         Exception when the subnet CIDR value is empty
259         """
260         self.network = neutron_utils.create_network(
261             self.neutron, self.os_creds, self.net_config.network_settings)
262         self.assertEqual(self.net_config.network_settings.name,
263                          self.network.name)
264         self.assertTrue(validate_network(
265             self.neutron, self.net_config.network_settings.name, True))
266
267         with self.assertRaises(Exception):
268             sub_sets = SubnetSettings(
269                 cidr='', name=self.net_config.subnet_name)
270             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
271                                         network=self.network)
272
273
274 class NeutronUtilsRouterTests(OSComponentTestCase):
275     """
276     Test for creating routers via neutron_utils.py
277     """
278
279     def setUp(self):
280         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
281         self.port_name = str(guid) + '-port'
282         self.neutron = neutron_utils.neutron_client(self.os_creds)
283         self.network = None
284         self.subnet = None
285         self.port = None
286         self.router = None
287         self.interface_router = None
288         self.net_config = openstack_tests.get_pub_net_config(
289             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
290             router_name=guid + '-pub-router', external_net=self.ext_net_name)
291
292     def tearDown(self):
293         """
294         Cleans the remote OpenStack objects
295         """
296         if self.interface_router:
297             neutron_utils.remove_interface_router(self.neutron, self.router,
298                                                   self.subnet)
299
300         if self.router:
301             try:
302                 neutron_utils.delete_router(self.neutron, self.router)
303                 validate_router(self.neutron, self.router.name, False)
304             except:
305                 pass
306
307         if self.port:
308             try:
309                 neutron_utils.delete_port(self.neutron, self.port)
310             except:
311                 pass
312
313         if self.subnet:
314             try:
315                 neutron_utils.delete_subnet(self.neutron, self.subnet)
316             except:
317                 pass
318
319         if self.network:
320             try:
321                 neutron_utils.delete_network(self.neutron, self.network)
322             except:
323                 pass
324
325     def test_create_router_simple(self):
326         """
327         Tests the neutron_utils.create_neutron_net() function when an external
328         gateway is requested
329         """
330         self.router = neutron_utils.create_router(
331             self.neutron, self.os_creds, self.net_config.router_settings)
332         validate_router(self.neutron, self.net_config.router_settings.name,
333                         True)
334
335     def test_create_router_with_public_interface(self):
336         """
337         Tests the neutron_utils.create_neutron_net() function when an external
338         gateway is requested
339         """
340         subnet_setting = self.net_config.network_settings.subnet_settings[0]
341         self.net_config = openstack_tests.OSNetworkConfig(
342             self.net_config.network_settings.name,
343             subnet_setting.name,
344             subnet_setting.cidr,
345             self.net_config.router_settings.name,
346             self.ext_net_name)
347         self.router = neutron_utils.create_router(
348             self.neutron, self.os_creds, self.net_config.router_settings)
349         validate_router(self.neutron, self.net_config.router_settings.name,
350                         True)
351
352         ext_net = neutron_utils.get_network(
353             self.neutron, network_name=self.ext_net_name)
354         self.assertEqual(
355             self.router.external_gateway_info['network_id'], ext_net.id)
356
357     def test_create_router_empty_name(self):
358         """
359         Tests the neutron_utils.create_neutron_net() function
360         """
361         with self.assertRaises(Exception):
362             this_router_settings = create_router.RouterSettings(name='')
363             self.router = neutron_utils.create_router(self.neutron,
364                                                       self.os_creds,
365                                                       this_router_settings)
366
367     def test_create_router_null_name(self):
368         """
369         Tests the neutron_utils.create_neutron_subnet() function when the
370         subnet CIDR value is None
371         """
372         with self.assertRaises(Exception):
373             this_router_settings = create_router.RouterSettings()
374             self.router = neutron_utils.create_router(self.neutron,
375                                                       self.os_creds,
376                                                       this_router_settings)
377             validate_router(self.neutron, None, True)
378
379     def test_add_interface_router(self):
380         """
381         Tests the neutron_utils.add_interface_router() function
382         """
383         self.network = neutron_utils.create_network(
384             self.neutron, self.os_creds, self.net_config.network_settings)
385         self.assertEqual(self.net_config.network_settings.name,
386                          self.network.name)
387         self.assertTrue(validate_network(
388             self.neutron, self.net_config.network_settings.name, True))
389
390         subnet_setting = self.net_config.network_settings.subnet_settings[0]
391         self.subnet = neutron_utils.create_subnet(
392             self.neutron, subnet_setting,
393             self.os_creds, self.network)
394         self.assertTrue(validate_subnet(
395             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
396
397         self.router = neutron_utils.create_router(
398             self.neutron, self.os_creds, self.net_config.router_settings)
399         validate_router(self.neutron, self.net_config.router_settings.name,
400                         True)
401
402         self.interface_router = neutron_utils.add_interface_router(
403             self.neutron, self.router, self.subnet)
404         validate_interface_router(self.interface_router, self.router,
405                                   self.subnet)
406
407     def test_add_interface_router_null_router(self):
408         """
409         Tests the neutron_utils.add_interface_router() function for an
410         Exception when the router value is None
411         """
412         self.network = neutron_utils.create_network(
413             self.neutron, self.os_creds, self.net_config.network_settings)
414         self.assertEqual(self.net_config.network_settings.name,
415                          self.network.name)
416         self.assertTrue(validate_network(
417             self.neutron, self.net_config.network_settings.name, True))
418
419         subnet_setting = self.net_config.network_settings.subnet_settings[0]
420         self.subnet = neutron_utils.create_subnet(
421             self.neutron, subnet_setting,
422             self.os_creds, self.network)
423         self.assertTrue(validate_subnet(
424             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
425
426         with self.assertRaises(NeutronException):
427             self.interface_router = neutron_utils.add_interface_router(
428                 self.neutron, self.router, self.subnet)
429
430     def test_add_interface_router_null_subnet(self):
431         """
432         Tests the neutron_utils.add_interface_router() function for an
433         Exception when the subnet value is None
434         """
435         self.network = neutron_utils.create_network(
436             self.neutron, self.os_creds, self.net_config.network_settings)
437         self.assertEqual(self.net_config.network_settings.name,
438                          self.network.name)
439         self.assertTrue(validate_network(
440             self.neutron, self.net_config.network_settings.name, True))
441
442         self.router = neutron_utils.create_router(
443             self.neutron, self.os_creds, self.net_config.router_settings)
444         validate_router(self.neutron, self.net_config.router_settings.name,
445                         True)
446
447         with self.assertRaises(NeutronException):
448             self.interface_router = neutron_utils.add_interface_router(
449                 self.neutron, self.router, self.subnet)
450
451     def test_create_port(self):
452         """
453         Tests the neutron_utils.create_port() function
454         """
455         self.network = neutron_utils.create_network(
456             self.neutron, self.os_creds, self.net_config.network_settings)
457         self.assertEqual(self.net_config.network_settings.name,
458                          self.network.name)
459         self.assertTrue(validate_network(
460             self.neutron, self.net_config.network_settings.name, True))
461
462         subnet_setting = self.net_config.network_settings.subnet_settings[0]
463         self.subnet = neutron_utils.create_subnet(
464             self.neutron, subnet_setting, self.os_creds, self.network)
465         self.assertTrue(validate_subnet(
466             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
467
468         self.port = neutron_utils.create_port(
469             self.neutron, self.os_creds, PortSettings(
470                 name=self.port_name,
471                 ip_addrs=[{
472                     'subnet_name': subnet_setting.name,
473                     'ip': ip_1}],
474                 network_name=self.net_config.network_settings.name))
475         validate_port(self.neutron, self.port, self.port_name)
476
477     def test_create_port_empty_name(self):
478         """
479         Tests the neutron_utils.create_port() function
480         """
481         self.network = neutron_utils.create_network(
482             self.neutron, self.os_creds, self.net_config.network_settings)
483         self.assertEqual(self.net_config.network_settings.name,
484                          self.network.name)
485         self.assertTrue(validate_network(
486             self.neutron, self.net_config.network_settings.name, True))
487
488         subnet_setting = self.net_config.network_settings.subnet_settings[0]
489         self.subnet = neutron_utils.create_subnet(
490             self.neutron, subnet_setting, self.os_creds, self.network)
491         self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
492                                         subnet_setting.cidr, True))
493
494         self.port = neutron_utils.create_port(
495             self.neutron, self.os_creds, PortSettings(
496                 name=self.port_name,
497                 network_name=self.net_config.network_settings.name,
498                 ip_addrs=[{
499                     'subnet_name': subnet_setting.name,
500                     'ip': ip_1}]))
501         validate_port(self.neutron, self.port, self.port_name)
502
503     def test_create_port_null_name(self):
504         """
505         Tests the neutron_utils.create_port() function for an Exception when
506         the port name value is None
507         """
508         self.network = neutron_utils.create_network(
509             self.neutron, self.os_creds, self.net_config.network_settings)
510         self.assertEqual(self.net_config.network_settings.name,
511                          self.network.name)
512         self.assertTrue(validate_network(
513             self.neutron, self.net_config.network_settings.name, True))
514
515         subnet_setting = self.net_config.network_settings.subnet_settings[0]
516         self.subnet = neutron_utils.create_subnet(
517             self.neutron, subnet_setting,
518             self.os_creds, self.network)
519         self.assertTrue(validate_subnet(
520             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
521
522         with self.assertRaises(Exception):
523             self.port = neutron_utils.create_port(
524                 self.neutron, self.os_creds,
525                 PortSettings(
526                     network_name=self.net_config.network_settings.name,
527                     ip_addrs=[{
528                         'subnet_name': subnet_setting.name,
529                         'ip': ip_1}]))
530
531     def test_create_port_null_network_object(self):
532         """
533         Tests the neutron_utils.create_port() function for an Exception when
534         the network object is None
535         """
536         with self.assertRaises(Exception):
537             self.port = neutron_utils.create_port(
538                 self.neutron, self.os_creds,
539                 PortSettings(
540                     name=self.port_name,
541                     network_name=self.net_config.network_settings.name,
542                     ip_addrs=[{
543                         'subnet_name':
544                             self.net_config.network_settings.subnet_settings[
545                                 0].name,
546                         'ip': ip_1}]))
547
548     def test_create_port_null_ip(self):
549         """
550         Tests the neutron_utils.create_port() function for an Exception when
551         the IP value is None
552         """
553         self.network = neutron_utils.create_network(
554             self.neutron, self.os_creds, self.net_config.network_settings)
555         self.assertEqual(self.net_config.network_settings.name,
556                          self.network.name)
557         self.assertTrue(validate_network(
558             self.neutron, self.net_config.network_settings.name, True))
559
560         subnet_setting = self.net_config.network_settings.subnet_settings[0]
561         self.subnet = neutron_utils.create_subnet(
562             self.neutron, subnet_setting,
563             self.os_creds, self.network)
564         self.assertTrue(validate_subnet(
565             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
566
567         with self.assertRaises(Exception):
568             self.port = neutron_utils.create_port(
569                 self.neutron, self.os_creds,
570                 PortSettings(
571                     name=self.port_name,
572                     network_name=self.net_config.network_settings.name,
573                     ip_addrs=[{
574                         'subnet_name': subnet_setting.name,
575                         'ip': None}]))
576
577     def test_create_port_invalid_ip(self):
578         """
579         Tests the neutron_utils.create_port() function for an Exception when
580         the IP value is None
581         """
582         self.network = neutron_utils.create_network(
583             self.neutron, self.os_creds, self.net_config.network_settings)
584         self.assertEqual(self.net_config.network_settings.name,
585                          self.network.name)
586         self.assertTrue(validate_network(
587             self.neutron, self.net_config.network_settings.name, True))
588
589         subnet_setting = self.net_config.network_settings.subnet_settings[0]
590         self.subnet = neutron_utils.create_subnet(
591             self.neutron, subnet_setting, self.os_creds, self.network)
592         self.assertTrue(validate_subnet(
593             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
594
595         with self.assertRaises(Exception):
596             self.port = neutron_utils.create_port(
597                 self.neutron, self.os_creds,
598                 PortSettings(
599                     name=self.port_name,
600                     network_name=self.net_config.network_settings.name,
601                     ip_addrs=[{
602                         'subnet_name': subnet_setting.name,
603                         'ip': 'foo'}]))
604
605     def test_create_port_invalid_ip_to_subnet(self):
606         """
607         Tests the neutron_utils.create_port() function for an Exception when
608         the IP value is None
609         """
610         self.network = neutron_utils.create_network(
611             self.neutron, self.os_creds, self.net_config.network_settings)
612         self.assertEqual(self.net_config.network_settings.name,
613                          self.network.name)
614         self.assertTrue(validate_network(
615             self.neutron, self.net_config.network_settings.name, True))
616
617         subnet_setting = self.net_config.network_settings.subnet_settings[0]
618         self.subnet = neutron_utils.create_subnet(
619             self.neutron, subnet_setting, self.os_creds, self.network)
620         self.assertTrue(validate_subnet(
621             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
622
623         with self.assertRaises(Exception):
624             self.port = neutron_utils.create_port(
625                 self.neutron, self.os_creds,
626                 PortSettings(
627                     name=self.port_name,
628                     network_name=self.net_config.network_settings.name,
629                     ip_addrs=[{
630                         'subnet_name': subnet_setting.name,
631                         'ip': '10.197.123.100'}]))
632
633
634 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
635     """
636     Test for creating security groups via neutron_utils.py
637     """
638
639     def setUp(self):
640         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
641         self.sec_grp_name = guid + 'name'
642
643         self.security_groups = list()
644         self.security_group_rules = list()
645         self.neutron = neutron_utils.neutron_client(self.os_creds)
646         self.keystone = keystone_utils.keystone_client(self.os_creds)
647
648     def tearDown(self):
649         """
650         Cleans the remote OpenStack objects
651         """
652         for rule in self.security_group_rules:
653             neutron_utils.delete_security_group_rule(self.neutron, rule)
654
655         for security_group in self.security_groups:
656             try:
657                 neutron_utils.delete_security_group(self.neutron,
658                                                     security_group)
659             except:
660                 pass
661
662     def test_create_delete_simple_sec_grp(self):
663         """
664         Tests the neutron_utils.create_security_group() function
665         """
666         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
667         security_group = neutron_utils.create_security_group(self.neutron,
668                                                              self.keystone,
669                                                              sec_grp_settings)
670
671         self.assertTrue(sec_grp_settings.name, security_group.name)
672
673         sec_grp_get = neutron_utils.get_security_group(
674             self.neutron, sec_grp_settings=sec_grp_settings)
675         self.assertIsNotNone(sec_grp_get)
676         self.assertTrue(validation_utils.objects_equivalent(
677             security_group, sec_grp_get))
678
679         neutron_utils.delete_security_group(self.neutron, security_group)
680         sec_grp_get = neutron_utils.get_security_group(
681             self.neutron, sec_grp_settings=sec_grp_settings)
682         self.assertIsNone(sec_grp_get)
683
684     def test_create_sec_grp_no_name(self):
685         """
686         Tests the SecurityGroupSettings constructor and
687         neutron_utils.create_security_group() function to ensure that
688         attempting to create a security group without a name will raise an
689         exception
690         """
691         with self.assertRaises(Exception):
692             sec_grp_settings = SecurityGroupSettings()
693             self.security_groups.append(
694                 neutron_utils.create_security_group(self.neutron,
695                                                     self.keystone,
696                                                     sec_grp_settings))
697
698     def test_create_sec_grp_no_rules(self):
699         """
700         Tests the neutron_utils.create_security_group() function
701         """
702         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
703                                                  description='hello group')
704         self.security_groups.append(
705             neutron_utils.create_security_group(self.neutron, self.keystone,
706                                                 sec_grp_settings))
707
708         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
709         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
710
711         sec_grp_get = neutron_utils.get_security_group(
712             self.neutron, sec_grp_settings=sec_grp_settings)
713         self.assertIsNotNone(sec_grp_get)
714         self.assertEqual(self.security_groups[0], sec_grp_get)
715
716     def test_create_sec_grp_one_rule(self):
717         """
718         Tests the neutron_utils.create_security_group() function
719         """
720
721         sec_grp_rule_settings = SecurityGroupRuleSettings(
722             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
723         sec_grp_settings = SecurityGroupSettings(
724             name=self.sec_grp_name, description='hello group',
725             rule_settings=[sec_grp_rule_settings])
726
727         self.security_groups.append(
728             neutron_utils.create_security_group(self.neutron, self.keystone,
729                                                 sec_grp_settings))
730         free_rules = neutron_utils.get_rules_by_security_group(
731             self.neutron, self.security_groups[0])
732         for free_rule in free_rules:
733             self.security_group_rules.append(free_rule)
734
735         self.security_group_rules.append(
736             neutron_utils.create_security_group_rule(
737                 self.neutron, sec_grp_settings.rule_settings[0]))
738
739         # Refresh object so it is populated with the newly added rule
740         security_group = neutron_utils.get_security_group(
741             self.neutron, sec_grp_settings=sec_grp_settings)
742
743         rules = neutron_utils.get_rules_by_security_group(self.neutron,
744                                                           security_group)
745
746         self.assertTrue(
747             validation_utils.objects_equivalent(
748                  self.security_group_rules, rules))
749
750         self.assertTrue(sec_grp_settings.name, security_group.name)
751
752         sec_grp_get = neutron_utils.get_security_group(
753             self.neutron, sec_grp_settings=sec_grp_settings)
754         self.assertIsNotNone(sec_grp_get)
755         self.assertEqual(security_group, sec_grp_get)
756
757     def test_get_sec_grp_by_id(self):
758         """
759         Tests the neutron_utils.create_security_group() function
760         """
761
762         self.security_groups.append(neutron_utils.create_security_group(
763             self.neutron, self.keystone,
764             SecurityGroupSettings(name=self.sec_grp_name + '-1',
765                                   description='hello group')))
766         self.security_groups.append(neutron_utils.create_security_group(
767             self.neutron, self.keystone,
768             SecurityGroupSettings(name=self.sec_grp_name + '-2',
769                                   description='hello group')))
770
771         sec_grp_1b = neutron_utils.get_security_group_by_id(
772             self.neutron, self.security_groups[0].id)
773         sec_grp_2b = neutron_utils.get_security_group_by_id(
774             self.neutron, self.security_groups[1].id)
775
776         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
777         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
778
779
780 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
781     """
782     Test basic nova keypair functionality
783     """
784
785     def setUp(self):
786         """
787         Instantiates the CreateImage object that is responsible for downloading
788         and creating an OS image file within OpenStack
789         """
790         self.neutron = neutron_utils.neutron_client(self.os_creds)
791         self.floating_ip = None
792
793     def tearDown(self):
794         """
795         Cleans the image and downloaded image file
796         """
797         if self.floating_ip:
798             try:
799                 neutron_utils.delete_floating_ip(
800                     self.neutron, self.floating_ip)
801             except:
802                 pass
803
804     def test_floating_ips(self):
805         """
806         Tests the creation of a floating IP
807         :return:
808         """
809         initial_fips = neutron_utils.get_floating_ips(self.neutron)
810
811         self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
812                                                             self.ext_net_name)
813         all_fips = neutron_utils.get_floating_ips(self.neutron)
814         self.assertEqual(len(initial_fips) + 1, len(all_fips))
815         returned = neutron_utils.get_floating_ip(self.neutron,
816                                                  self.floating_ip)
817         self.assertEqual(self.floating_ip.id, returned.id)
818         self.assertEqual(self.floating_ip.ip, returned.ip)
819
820
821 """
822 Validation routines
823 """
824
825
826 def validate_network(neutron, name, exists):
827     """
828     Returns true if a network for a given name DOES NOT exist if the exists
829     parameter is false conversely true. Returns false if a network for a given
830     name DOES exist if the exists parameter is true conversely false.
831     :param neutron: The neutron client
832     :param name: The expected network name
833     :param exists: Whether or not the network name should exist or not
834     :return: True/False
835     """
836     network = neutron_utils.get_network(neutron, network_name=name)
837     if exists and network:
838         return True
839     if not exists and not network:
840         return True
841     return False
842
843
844 def validate_subnet(neutron, name, cidr, exists):
845     """
846     Returns true if a subnet for a given name DOES NOT exist if the exists
847     parameter is false conversely true. Returns false if a subnet for a given
848     name DOES exist if the exists parameter is true conversely false.
849     :param neutron: The neutron client
850     :param name: The expected subnet name
851     :param cidr: The expected CIDR value
852     :param exists: Whether or not the network name should exist or not
853     :return: True/False
854     """
855     subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
856     if exists and subnet and subnet.name == name:
857         return subnet.cidr == cidr
858     if not exists and not subnet:
859         return True
860     return False
861
862
863 def validate_router(neutron, name, exists):
864     """
865     Returns true if a router for a given name DOES NOT exist if the exists
866     parameter is false conversely true. Returns false if a router for a given
867     name DOES exist if the exists parameter is true conversely false.
868     :param neutron: The neutron client
869     :param name: The expected router name
870     :param exists: Whether or not the network name should exist or not
871     :return: True/False
872     """
873     router = neutron_utils.get_router(neutron, router_name=name)
874     if exists and router:
875         return True
876     return False
877
878
879 def validate_interface_router(interface_router, router, subnet):
880     """
881     Returns true if the router ID & subnet ID have been properly included into
882     the interface router object
883     :param interface_router: the SNAPS-OO InterfaceRouter domain object
884     :param router: to validate against the interface_router
885     :param subnet: to validate against the interface_router
886     :return: True if both IDs match else False
887     """
888     subnet_id = interface_router.subnet_id
889     router_id = interface_router.port_id
890
891     return subnet.id == subnet_id and router.id == router_id
892
893
894 def validate_port(neutron, port_obj, this_port_name):
895     """
896     Returns true if a port for a given name DOES NOT exist if the exists
897     parameter is false conversely true. Returns false if a port for a given
898     name DOES exist if the exists parameter is true conversely false.
899     :param neutron: The neutron client
900     :param port_obj: The port object to lookup
901     :param this_port_name: The expected router name
902     :return: True/False
903     """
904     os_ports = neutron.list_ports()
905     for os_port, os_port_insts in os_ports.items():
906         for os_inst in os_port_insts:
907             if os_inst['id'] == port_obj.id:
908                 return os_inst['name'] == this_port_name
909     return False