Added method to OpenStackHeatStack to return OpenStackRouter objects.
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
19     PortSettings
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21     SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(self.os_creds)
92         self.network = None
93         self.net_config = openstack_tests.get_pub_net_config(
94             net_name=guid + '-pub-net')
95
96     def tearDown(self):
97         """
98         Cleans the remote OpenStack objects
99         """
100         if self.network:
101             neutron_utils.delete_network(self.neutron, self.network)
102
103     def test_create_network(self):
104         """
105         Tests the neutron_utils.create_neutron_net() function
106         """
107         self.network = neutron_utils.create_network(
108             self.neutron, self.os_creds, self.net_config.network_settings)
109         self.assertEqual(self.net_config.network_settings.name,
110                          self.network.name)
111         self.assertTrue(validate_network(
112             self.neutron, self.net_config.network_settings.name, True))
113
114     def test_create_network_empty_name(self):
115         """
116         Tests the neutron_utils.create_neutron_net() function with an empty
117         network name
118         """
119         with self.assertRaises(Exception):
120             self.network = neutron_utils.create_network(
121                 self.neutron, self.os_creds,
122                 network_settings=NetworkSettings(name=''))
123
124     def test_create_network_null_name(self):
125         """
126         Tests the neutron_utils.create_neutron_net() function when the network
127         name is None
128         """
129         with self.assertRaises(Exception):
130             self.network = neutron_utils.create_network(
131                 self.neutron, self.os_creds,
132                 network_settings=NetworkSettings())
133
134
135 class NeutronUtilsSubnetTests(OSComponentTestCase):
136     """
137     Test for creating networks with subnets via neutron_utils.py
138     """
139
140     def setUp(self):
141         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
142         self.port_name = str(guid) + '-port'
143         self.neutron = neutron_utils.neutron_client(self.os_creds)
144         self.network = None
145         self.subnet = None
146         self.net_config = openstack_tests.get_pub_net_config(
147             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
148             external_net=self.ext_net_name)
149
150     def tearDown(self):
151         """
152         Cleans the remote OpenStack objects
153         """
154         if self.subnet:
155             try:
156                 neutron_utils.delete_subnet(self.neutron, self.subnet)
157             except:
158                 pass
159         if self.network:
160             try:
161                 neutron_utils.delete_network(self.neutron, self.network)
162             except:
163                 pass
164
165     def test_create_subnet(self):
166         """
167         Tests the neutron_utils.create_neutron_net() function
168         """
169         self.network = neutron_utils.create_network(
170             self.neutron, self.os_creds, self.net_config.network_settings)
171         self.assertEqual(self.net_config.network_settings.name,
172                          self.network.name)
173         self.assertTrue(validate_network(
174             self.neutron, self.net_config.network_settings.name, True))
175
176         subnet_setting = self.net_config.network_settings.subnet_settings[0]
177         self.subnet = neutron_utils.create_subnet(
178             self.neutron, subnet_setting, self.os_creds, network=self.network)
179         self.assertTrue(validate_subnet(
180             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
181
182         subnet_query1 = neutron_utils.get_subnet(
183             self.neutron, subnet_name=subnet_setting.name)
184         self.assertEqual(self.subnet, subnet_query1)
185
186         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
187                                                              self.network)
188         self.assertIsNotNone(subnet_query2)
189         self.assertEqual(1, len(subnet_query2))
190         self.assertEqual(self.subnet, subnet_query2[0])
191
192     def test_create_subnet_null_name(self):
193         """
194         Tests the neutron_utils.create_neutron_subnet() function for an
195         Exception when the subnet name is None
196         """
197         self.network = neutron_utils.create_network(
198             self.neutron, self.os_creds, self.net_config.network_settings)
199         self.assertEqual(self.net_config.network_settings.name,
200                          self.network.name)
201         self.assertTrue(validate_network(
202             self.neutron, self.net_config.network_settings.name, True))
203
204         with self.assertRaises(Exception):
205             SubnetSettings(cidr=self.net_config.subnet_cidr)
206
207     def test_create_subnet_empty_name(self):
208         """
209         Tests the neutron_utils.create_neutron_net() function with an empty
210         name
211         """
212         self.network = neutron_utils.create_network(
213             self.neutron, self.os_creds, self.net_config.network_settings)
214         self.assertEqual(self.net_config.network_settings.name,
215                          self.network.name)
216         self.assertTrue(validate_network(
217             self.neutron, self.net_config.network_settings.name, True))
218
219         subnet_setting = self.net_config.network_settings.subnet_settings[0]
220         self.subnet = neutron_utils.create_subnet(
221             self.neutron, subnet_setting, self.os_creds, network=self.network)
222         self.assertTrue(validate_subnet(
223             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
224         self.assertFalse(validate_subnet(
225             self.neutron, '', subnet_setting.cidr, True))
226
227         subnet_query1 = neutron_utils.get_subnet(
228             self.neutron, subnet_name=subnet_setting.name)
229         self.assertEqual(self.subnet, subnet_query1)
230
231         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
232                                                              self.network)
233         self.assertIsNotNone(subnet_query2)
234         self.assertEqual(1, len(subnet_query2))
235         self.assertEqual(self.subnet, subnet_query2[0])
236
237     def test_create_subnet_null_cidr(self):
238         """
239         Tests the neutron_utils.create_neutron_subnet() function for an
240         Exception when the subnet CIDR value is None
241         """
242         self.network = neutron_utils.create_network(
243             self.neutron, self.os_creds, self.net_config.network_settings)
244         self.assertEqual(self.net_config.network_settings.name,
245                          self.network.name)
246         self.assertTrue(validate_network(
247             self.neutron, self.net_config.network_settings.name, True))
248
249         with self.assertRaises(Exception):
250             sub_sets = SubnetSettings(
251                 cidr=None, name=self.net_config.subnet_name)
252             neutron_utils.create_subnet(
253                 self.neutron, sub_sets, self.os_creds, network=self.network)
254
255     def test_create_subnet_empty_cidr(self):
256         """
257         Tests the neutron_utils.create_neutron_subnet() function for an
258         Exception when the subnet CIDR value is empty
259         """
260         self.network = neutron_utils.create_network(
261             self.neutron, self.os_creds, self.net_config.network_settings)
262         self.assertEqual(self.net_config.network_settings.name,
263                          self.network.name)
264         self.assertTrue(validate_network(
265             self.neutron, self.net_config.network_settings.name, True))
266
267         with self.assertRaises(Exception):
268             sub_sets = SubnetSettings(
269                 cidr='', name=self.net_config.subnet_name)
270             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
271                                         network=self.network)
272
273
274 class NeutronUtilsRouterTests(OSComponentTestCase):
275     """
276     Test for creating routers via neutron_utils.py
277     """
278
279     def setUp(self):
280         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
281         self.port_name = str(guid) + '-port'
282         self.neutron = neutron_utils.neutron_client(self.os_creds)
283         self.network = None
284         self.subnet = None
285         self.port = None
286         self.router = None
287         self.interface_router = None
288         self.net_config = openstack_tests.get_pub_net_config(
289             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
290             router_name=guid + '-pub-router', external_net=self.ext_net_name)
291
292     def tearDown(self):
293         """
294         Cleans the remote OpenStack objects
295         """
296         if self.interface_router:
297             neutron_utils.remove_interface_router(self.neutron, self.router,
298                                                   self.subnet)
299
300         if self.router:
301             try:
302                 neutron_utils.delete_router(self.neutron, self.router)
303                 validate_router(self.neutron, self.router.name, False)
304             except:
305                 pass
306
307         if self.port:
308             try:
309                 neutron_utils.delete_port(self.neutron, self.port)
310             except:
311                 pass
312
313         if self.subnet:
314             try:
315                 neutron_utils.delete_subnet(self.neutron, self.subnet)
316             except:
317                 pass
318
319         if self.network:
320             try:
321                 neutron_utils.delete_network(self.neutron, self.network)
322             except:
323                 pass
324
325     def test_create_router_simple(self):
326         """
327         Tests the neutron_utils.create_neutron_net() function when an external
328         gateway is requested
329         """
330         self.router = neutron_utils.create_router(
331             self.neutron, self.os_creds, self.net_config.router_settings)
332         validate_router(self.neutron, self.net_config.router_settings.name,
333                         True)
334
335     def test_create_router_with_public_interface(self):
336         """
337         Tests the neutron_utils.create_neutron_net() function when an external
338         gateway is requested
339         """
340         subnet_setting = self.net_config.network_settings.subnet_settings[0]
341         self.net_config = openstack_tests.OSNetworkConfig(
342             self.net_config.network_settings.name,
343             subnet_setting.name,
344             subnet_setting.cidr,
345             self.net_config.router_settings.name,
346             self.ext_net_name)
347         self.router = neutron_utils.create_router(
348             self.neutron, self.os_creds, self.net_config.router_settings)
349         validate_router(self.neutron, self.net_config.router_settings.name,
350                         True)
351
352         ext_net = neutron_utils.get_network(
353             self.neutron, network_name=self.ext_net_name)
354         self.assertEqual(
355             self.router.external_network_id, ext_net.id)
356
357     def test_create_router_empty_name(self):
358         """
359         Tests the neutron_utils.create_neutron_net() function
360         """
361         with self.assertRaises(Exception):
362             this_router_settings = create_router.RouterSettings(name='')
363             self.router = neutron_utils.create_router(self.neutron,
364                                                       self.os_creds,
365                                                       this_router_settings)
366
367     def test_create_router_null_name(self):
368         """
369         Tests the neutron_utils.create_neutron_subnet() function when the
370         subnet CIDR value is None
371         """
372         with self.assertRaises(Exception):
373             this_router_settings = create_router.RouterSettings()
374             self.router = neutron_utils.create_router(self.neutron,
375                                                       self.os_creds,
376                                                       this_router_settings)
377             validate_router(self.neutron, None, True)
378
379     def test_add_interface_router(self):
380         """
381         Tests the neutron_utils.add_interface_router() function
382         """
383         self.network = neutron_utils.create_network(
384             self.neutron, self.os_creds, self.net_config.network_settings)
385         self.assertEqual(self.net_config.network_settings.name,
386                          self.network.name)
387         self.assertTrue(validate_network(
388             self.neutron, self.net_config.network_settings.name, True))
389
390         subnet_setting = self.net_config.network_settings.subnet_settings[0]
391         self.subnet = neutron_utils.create_subnet(
392             self.neutron, subnet_setting,
393             self.os_creds, self.network)
394         self.assertTrue(validate_subnet(
395             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
396
397         self.router = neutron_utils.create_router(
398             self.neutron, self.os_creds, self.net_config.router_settings)
399         validate_router(self.neutron, self.net_config.router_settings.name,
400                         True)
401
402         self.interface_router = neutron_utils.add_interface_router(
403             self.neutron, self.router, self.subnet)
404         validate_interface_router(self.interface_router, self.router,
405                                   self.subnet)
406
407     def test_add_interface_router_null_router(self):
408         """
409         Tests the neutron_utils.add_interface_router() function for an
410         Exception when the router value is None
411         """
412         self.network = neutron_utils.create_network(
413             self.neutron, self.os_creds, self.net_config.network_settings)
414         self.assertEqual(self.net_config.network_settings.name,
415                          self.network.name)
416         self.assertTrue(validate_network(
417             self.neutron, self.net_config.network_settings.name, True))
418
419         subnet_setting = self.net_config.network_settings.subnet_settings[0]
420         self.subnet = neutron_utils.create_subnet(
421             self.neutron, subnet_setting,
422             self.os_creds, self.network)
423         self.assertTrue(validate_subnet(
424             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
425
426         with self.assertRaises(NeutronException):
427             self.interface_router = neutron_utils.add_interface_router(
428                 self.neutron, self.router, self.subnet)
429
430     def test_add_interface_router_null_subnet(self):
431         """
432         Tests the neutron_utils.add_interface_router() function for an
433         Exception when the subnet value is None
434         """
435         self.network = neutron_utils.create_network(
436             self.neutron, self.os_creds, self.net_config.network_settings)
437         self.assertEqual(self.net_config.network_settings.name,
438                          self.network.name)
439         self.assertTrue(validate_network(
440             self.neutron, self.net_config.network_settings.name, True))
441
442         self.router = neutron_utils.create_router(
443             self.neutron, self.os_creds, self.net_config.router_settings)
444         validate_router(self.neutron, self.net_config.router_settings.name,
445                         True)
446
447         with self.assertRaises(NeutronException):
448             self.interface_router = neutron_utils.add_interface_router(
449                 self.neutron, self.router, self.subnet)
450
451     def test_create_port(self):
452         """
453         Tests the neutron_utils.create_port() function
454         """
455         self.network = neutron_utils.create_network(
456             self.neutron, self.os_creds, self.net_config.network_settings)
457         self.assertEqual(self.net_config.network_settings.name,
458                          self.network.name)
459         self.assertTrue(validate_network(
460             self.neutron, self.net_config.network_settings.name, True))
461
462         subnet_setting = self.net_config.network_settings.subnet_settings[0]
463         self.subnet = neutron_utils.create_subnet(
464             self.neutron, subnet_setting, self.os_creds, self.network)
465         self.assertTrue(validate_subnet(
466             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
467
468         self.port = neutron_utils.create_port(
469             self.neutron, self.os_creds, PortSettings(
470                 name=self.port_name,
471                 ip_addrs=[{
472                     'subnet_name': subnet_setting.name,
473                     'ip': ip_1}],
474                 network_name=self.net_config.network_settings.name))
475         validate_port(self.neutron, self.port, self.port_name)
476
477     def test_create_port_empty_name(self):
478         """
479         Tests the neutron_utils.create_port() function
480         """
481         self.network = neutron_utils.create_network(
482             self.neutron, self.os_creds, self.net_config.network_settings)
483         self.assertEqual(self.net_config.network_settings.name,
484                          self.network.name)
485         self.assertTrue(validate_network(
486             self.neutron, self.net_config.network_settings.name, True))
487
488         subnet_setting = self.net_config.network_settings.subnet_settings[0]
489         self.subnet = neutron_utils.create_subnet(
490             self.neutron, subnet_setting, self.os_creds, self.network)
491         self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
492                                         subnet_setting.cidr, True))
493
494         self.port = neutron_utils.create_port(
495             self.neutron, self.os_creds, PortSettings(
496                 name=self.port_name,
497                 network_name=self.net_config.network_settings.name,
498                 ip_addrs=[{
499                     'subnet_name': subnet_setting.name,
500                     'ip': ip_1}]))
501         validate_port(self.neutron, self.port, self.port_name)
502
503     def test_create_port_null_name(self):
504         """
505         Tests the neutron_utils.create_port() when the port name value is None
506         """
507         self.network = neutron_utils.create_network(
508             self.neutron, self.os_creds, self.net_config.network_settings)
509         self.assertEqual(self.net_config.network_settings.name,
510                          self.network.name)
511         self.assertTrue(validate_network(
512             self.neutron, self.net_config.network_settings.name, True))
513
514         subnet_setting = self.net_config.network_settings.subnet_settings[0]
515         self.subnet = neutron_utils.create_subnet(
516             self.neutron, subnet_setting,
517             self.os_creds, self.network)
518         self.assertTrue(validate_subnet(
519             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
520
521         self.port = neutron_utils.create_port(
522             self.neutron, self.os_creds,
523             PortSettings(
524                 network_name=self.net_config.network_settings.name,
525                 ip_addrs=[{
526                     'subnet_name': subnet_setting.name,
527                     'ip': ip_1}]))
528
529         port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
530         self.assertEqual(self.port, port)
531
532     def test_create_port_null_network_object(self):
533         """
534         Tests the neutron_utils.create_port() function for an Exception when
535         the network object is None
536         """
537         with self.assertRaises(Exception):
538             self.port = neutron_utils.create_port(
539                 self.neutron, self.os_creds,
540                 PortSettings(
541                     name=self.port_name,
542                     network_name=self.net_config.network_settings.name,
543                     ip_addrs=[{
544                         'subnet_name':
545                             self.net_config.network_settings.subnet_settings[
546                                 0].name,
547                         'ip': ip_1}]))
548
549     def test_create_port_null_ip(self):
550         """
551         Tests the neutron_utils.create_port() function for an Exception when
552         the IP value is None
553         """
554         self.network = neutron_utils.create_network(
555             self.neutron, self.os_creds, self.net_config.network_settings)
556         self.assertEqual(self.net_config.network_settings.name,
557                          self.network.name)
558         self.assertTrue(validate_network(
559             self.neutron, self.net_config.network_settings.name, True))
560
561         subnet_setting = self.net_config.network_settings.subnet_settings[0]
562         self.subnet = neutron_utils.create_subnet(
563             self.neutron, subnet_setting,
564             self.os_creds, self.network)
565         self.assertTrue(validate_subnet(
566             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
567
568         with self.assertRaises(Exception):
569             self.port = neutron_utils.create_port(
570                 self.neutron, self.os_creds,
571                 PortSettings(
572                     name=self.port_name,
573                     network_name=self.net_config.network_settings.name,
574                     ip_addrs=[{
575                         'subnet_name': subnet_setting.name,
576                         'ip': None}]))
577
578     def test_create_port_invalid_ip(self):
579         """
580         Tests the neutron_utils.create_port() function for an Exception when
581         the IP value is None
582         """
583         self.network = neutron_utils.create_network(
584             self.neutron, self.os_creds, self.net_config.network_settings)
585         self.assertEqual(self.net_config.network_settings.name,
586                          self.network.name)
587         self.assertTrue(validate_network(
588             self.neutron, self.net_config.network_settings.name, True))
589
590         subnet_setting = self.net_config.network_settings.subnet_settings[0]
591         self.subnet = neutron_utils.create_subnet(
592             self.neutron, subnet_setting, self.os_creds, self.network)
593         self.assertTrue(validate_subnet(
594             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
595
596         with self.assertRaises(Exception):
597             self.port = neutron_utils.create_port(
598                 self.neutron, self.os_creds,
599                 PortSettings(
600                     name=self.port_name,
601                     network_name=self.net_config.network_settings.name,
602                     ip_addrs=[{
603                         'subnet_name': subnet_setting.name,
604                         'ip': 'foo'}]))
605
606     def test_create_port_invalid_ip_to_subnet(self):
607         """
608         Tests the neutron_utils.create_port() function for an Exception when
609         the IP value is None
610         """
611         self.network = neutron_utils.create_network(
612             self.neutron, self.os_creds, self.net_config.network_settings)
613         self.assertEqual(self.net_config.network_settings.name,
614                          self.network.name)
615         self.assertTrue(validate_network(
616             self.neutron, self.net_config.network_settings.name, True))
617
618         subnet_setting = self.net_config.network_settings.subnet_settings[0]
619         self.subnet = neutron_utils.create_subnet(
620             self.neutron, subnet_setting, self.os_creds, self.network)
621         self.assertTrue(validate_subnet(
622             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
623
624         with self.assertRaises(Exception):
625             self.port = neutron_utils.create_port(
626                 self.neutron, self.os_creds,
627                 PortSettings(
628                     name=self.port_name,
629                     network_name=self.net_config.network_settings.name,
630                     ip_addrs=[{
631                         'subnet_name': subnet_setting.name,
632                         'ip': '10.197.123.100'}]))
633
634
635 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
636     """
637     Test for creating security groups via neutron_utils.py
638     """
639
640     def setUp(self):
641         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
642         self.sec_grp_name = guid + 'name'
643
644         self.security_groups = list()
645         self.security_group_rules = list()
646         self.neutron = neutron_utils.neutron_client(self.os_creds)
647         self.keystone = keystone_utils.keystone_client(self.os_creds)
648
649     def tearDown(self):
650         """
651         Cleans the remote OpenStack objects
652         """
653         for rule in self.security_group_rules:
654             neutron_utils.delete_security_group_rule(self.neutron, rule)
655
656         for security_group in self.security_groups:
657             try:
658                 neutron_utils.delete_security_group(self.neutron,
659                                                     security_group)
660             except:
661                 pass
662
663     def test_create_delete_simple_sec_grp(self):
664         """
665         Tests the neutron_utils.create_security_group() function
666         """
667         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
668         security_group = neutron_utils.create_security_group(self.neutron,
669                                                              self.keystone,
670                                                              sec_grp_settings)
671
672         self.assertTrue(sec_grp_settings.name, security_group.name)
673
674         sec_grp_get = neutron_utils.get_security_group(
675             self.neutron, sec_grp_settings=sec_grp_settings)
676         self.assertIsNotNone(sec_grp_get)
677         self.assertTrue(validation_utils.objects_equivalent(
678             security_group, sec_grp_get))
679
680         neutron_utils.delete_security_group(self.neutron, security_group)
681         sec_grp_get = neutron_utils.get_security_group(
682             self.neutron, sec_grp_settings=sec_grp_settings)
683         self.assertIsNone(sec_grp_get)
684
685     def test_create_sec_grp_no_name(self):
686         """
687         Tests the SecurityGroupSettings constructor and
688         neutron_utils.create_security_group() function to ensure that
689         attempting to create a security group without a name will raise an
690         exception
691         """
692         with self.assertRaises(Exception):
693             sec_grp_settings = SecurityGroupSettings()
694             self.security_groups.append(
695                 neutron_utils.create_security_group(self.neutron,
696                                                     self.keystone,
697                                                     sec_grp_settings))
698
699     def test_create_sec_grp_no_rules(self):
700         """
701         Tests the neutron_utils.create_security_group() function
702         """
703         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
704                                                  description='hello group')
705         self.security_groups.append(
706             neutron_utils.create_security_group(self.neutron, self.keystone,
707                                                 sec_grp_settings))
708
709         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
710         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
711
712         sec_grp_get = neutron_utils.get_security_group(
713             self.neutron, sec_grp_settings=sec_grp_settings)
714         self.assertIsNotNone(sec_grp_get)
715         self.assertEqual(self.security_groups[0], sec_grp_get)
716
717     def test_create_sec_grp_one_rule(self):
718         """
719         Tests the neutron_utils.create_security_group() function
720         """
721
722         sec_grp_rule_settings = SecurityGroupRuleSettings(
723             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
724         sec_grp_settings = SecurityGroupSettings(
725             name=self.sec_grp_name, description='hello group',
726             rule_settings=[sec_grp_rule_settings])
727
728         self.security_groups.append(
729             neutron_utils.create_security_group(self.neutron, self.keystone,
730                                                 sec_grp_settings))
731         free_rules = neutron_utils.get_rules_by_security_group(
732             self.neutron, self.security_groups[0])
733         for free_rule in free_rules:
734             self.security_group_rules.append(free_rule)
735
736         self.security_group_rules.append(
737             neutron_utils.create_security_group_rule(
738                 self.neutron, sec_grp_settings.rule_settings[0]))
739
740         # Refresh object so it is populated with the newly added rule
741         security_group = neutron_utils.get_security_group(
742             self.neutron, sec_grp_settings=sec_grp_settings)
743
744         rules = neutron_utils.get_rules_by_security_group(self.neutron,
745                                                           security_group)
746
747         self.assertTrue(
748             validation_utils.objects_equivalent(
749                  self.security_group_rules, rules))
750
751         self.assertTrue(sec_grp_settings.name, security_group.name)
752
753         sec_grp_get = neutron_utils.get_security_group(
754             self.neutron, sec_grp_settings=sec_grp_settings)
755         self.assertIsNotNone(sec_grp_get)
756         self.assertEqual(security_group, sec_grp_get)
757
758     def test_get_sec_grp_by_id(self):
759         """
760         Tests the neutron_utils.create_security_group() function
761         """
762
763         self.security_groups.append(neutron_utils.create_security_group(
764             self.neutron, self.keystone,
765             SecurityGroupSettings(name=self.sec_grp_name + '-1',
766                                   description='hello group')))
767         self.security_groups.append(neutron_utils.create_security_group(
768             self.neutron, self.keystone,
769             SecurityGroupSettings(name=self.sec_grp_name + '-2',
770                                   description='hello group')))
771
772         sec_grp_1b = neutron_utils.get_security_group_by_id(
773             self.neutron, self.security_groups[0].id)
774         sec_grp_2b = neutron_utils.get_security_group_by_id(
775             self.neutron, self.security_groups[1].id)
776
777         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
778         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
779
780
781 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
782     """
783     Test basic nova keypair functionality
784     """
785
786     def setUp(self):
787         """
788         Instantiates the CreateImage object that is responsible for downloading
789         and creating an OS image file within OpenStack
790         """
791         self.neutron = neutron_utils.neutron_client(self.os_creds)
792         self.floating_ip = None
793
794     def tearDown(self):
795         """
796         Cleans the image and downloaded image file
797         """
798         if self.floating_ip:
799             try:
800                 neutron_utils.delete_floating_ip(
801                     self.neutron, self.floating_ip)
802             except:
803                 pass
804
805     def test_floating_ips(self):
806         """
807         Tests the creation of a floating IP
808         :return:
809         """
810         initial_fips = neutron_utils.get_floating_ips(self.neutron)
811
812         self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
813                                                             self.ext_net_name)
814         all_fips = neutron_utils.get_floating_ips(self.neutron)
815         self.assertEqual(len(initial_fips) + 1, len(all_fips))
816         returned = neutron_utils.get_floating_ip(self.neutron,
817                                                  self.floating_ip)
818         self.assertEqual(self.floating_ip.id, returned.id)
819         self.assertEqual(self.floating_ip.ip, returned.ip)
820
821
822 """
823 Validation routines
824 """
825
826
827 def validate_network(neutron, name, exists):
828     """
829     Returns true if a network for a given name DOES NOT exist if the exists
830     parameter is false conversely true. Returns false if a network for a given
831     name DOES exist if the exists parameter is true conversely false.
832     :param neutron: The neutron client
833     :param name: The expected network name
834     :param exists: Whether or not the network name should exist or not
835     :return: True/False
836     """
837     network = neutron_utils.get_network(neutron, network_name=name)
838     if exists and network:
839         return True
840     if not exists and not network:
841         return True
842     return False
843
844
845 def validate_subnet(neutron, name, cidr, exists):
846     """
847     Returns true if a subnet for a given name DOES NOT exist if the exists
848     parameter is false conversely true. Returns false if a subnet for a given
849     name DOES exist if the exists parameter is true conversely false.
850     :param neutron: The neutron client
851     :param name: The expected subnet name
852     :param cidr: The expected CIDR value
853     :param exists: Whether or not the network name should exist or not
854     :return: True/False
855     """
856     subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
857     if exists and subnet and subnet.name == name:
858         return subnet.cidr == cidr
859     if not exists and not subnet:
860         return True
861     return False
862
863
864 def validate_router(neutron, name, exists):
865     """
866     Returns true if a router for a given name DOES NOT exist if the exists
867     parameter is false conversely true. Returns false if a router for a given
868     name DOES exist if the exists parameter is true conversely false.
869     :param neutron: The neutron client
870     :param name: The expected router name
871     :param exists: Whether or not the network name should exist or not
872     :return: True/False
873     """
874     router = neutron_utils.get_router(neutron, router_name=name)
875     if exists and router:
876         return True
877     return False
878
879
880 def validate_interface_router(interface_router, router, subnet):
881     """
882     Returns true if the router ID & subnet ID have been properly included into
883     the interface router object
884     :param interface_router: the SNAPS-OO InterfaceRouter domain object
885     :param router: to validate against the interface_router
886     :param subnet: to validate against the interface_router
887     :return: True if both IDs match else False
888     """
889     subnet_id = interface_router.subnet_id
890     router_id = interface_router.port_id
891
892     return subnet.id == subnet_id and router.id == router_id
893
894
895 def validate_port(neutron, port_obj, this_port_name):
896     """
897     Returns true if a port for a given name DOES NOT exist if the exists
898     parameter is false conversely true. Returns false if a port for a given
899     name DOES exist if the exists parameter is true conversely false.
900     :param neutron: The neutron client
901     :param port_obj: The port object to lookup
902     :param this_port_name: The expected router name
903     :return: True/False
904     """
905     os_ports = neutron.list_ports()
906     for os_port, os_port_insts in os_ports.items():
907         for os_inst in os_port_insts:
908             if os_inst['id'] == port_obj.id:
909                 return os_inst['name'] == this_port_name
910     return False