Closing keystone sessions after done with them.
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from neutronclient.common.exceptions import NotFound, BadRequest
18
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.config.security_group import (
21     SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(
92             self.os_creds, self.os_session)
93         self.keystone = keystone_utils.keystone_client(
94             self.os_creds, self.os_session)
95         self.network = None
96         self.net_config = openstack_tests.get_pub_net_config(
97             net_name=guid + '-pub-net')
98
99     def tearDown(self):
100         """
101         Cleans the remote OpenStack objects
102         """
103         if self.network:
104             neutron_utils.delete_network(self.neutron, self.network)
105
106         super(self.__class__, self).__clean__()
107
108     def test_create_network(self):
109         """
110         Tests the neutron_utils.create_network() function
111         """
112         self.network = neutron_utils.create_network(
113             self.neutron, self.os_creds, self.net_config.network_settings)
114         self.assertEqual(self.net_config.network_settings.name,
115                          self.network.name)
116         self.assertTrue(validate_network(
117             self.neutron, self.keystone,
118             self.net_config.network_settings.name, True,
119             self.os_creds.project_name))
120         self.assertEqual(len(self.net_config.network_settings.subnet_settings),
121                          len(self.network.subnets))
122
123     def test_create_network_empty_name(self):
124         """
125         Tests the neutron_utils.create_network() function with an empty
126         network name
127         """
128         with self.assertRaises(Exception):
129             self.network = neutron_utils.create_network(
130                 self.neutron, self.os_creds,
131                 network_settings=NetworkConfig(name=''))
132
133     def test_create_network_null_name(self):
134         """
135         Tests the neutron_utils.create_network() function when the network
136         name is None
137         """
138         with self.assertRaises(Exception):
139             self.network = neutron_utils.create_network(
140                 self.neutron, self.os_creds,
141                 network_settings=NetworkConfig())
142
143
144 class NeutronUtilsSubnetTests(OSComponentTestCase):
145     """
146     Test for creating networks with subnets via neutron_utils.py
147     """
148
149     def setUp(self):
150         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
151         self.port_name = str(guid) + '-port'
152         self.neutron = neutron_utils.neutron_client(
153             self.os_creds, self.os_session)
154         self.keystone = keystone_utils.keystone_client(
155             self.os_creds, self.os_session)
156         self.network = None
157         self.net_config = openstack_tests.get_pub_net_config(
158             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
159             external_net=self.ext_net_name)
160
161     def tearDown(self):
162         """
163         Cleans the remote OpenStack objects
164         """
165         if self.network:
166             try:
167                 neutron_utils.delete_network(self.neutron, self.network)
168             except:
169                 pass
170
171         super(self.__class__, self).__clean__()
172
173     def test_create_subnet(self):
174         """
175         Tests the neutron_utils.create_network() function
176         """
177         self.network = neutron_utils.create_network(
178             self.neutron, self.os_creds, self.net_config.network_settings)
179         self.assertEqual(self.net_config.network_settings.name,
180                          self.network.name)
181         self.assertTrue(validate_network(
182             self.neutron, self.keystone,
183             self.net_config.network_settings.name, True,
184             self.os_creds.project_name))
185
186         subnet_setting = self.net_config.network_settings.subnet_settings[0]
187         self.assertTrue(validate_subnet(
188             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
189
190         subnet_query1 = neutron_utils.get_subnet(
191             self.neutron, subnet_name=subnet_setting.name)
192         self.assertEqual(self.network.subnets[0], subnet_query1)
193
194         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
195                                                              self.network)
196         self.assertIsNotNone(subnet_query2)
197         self.assertEqual(1, len(subnet_query2))
198         self.assertEqual(self.network.subnets[0], subnet_query2[0])
199
200     def test_create_subnet_null_name(self):
201         """
202         Tests the neutron_utils.create_neutron_subnet() function for an
203         Exception when the subnet name is None
204         """
205         self.network = neutron_utils.create_network(
206             self.neutron, self.os_creds, self.net_config.network_settings)
207         self.assertEqual(self.net_config.network_settings.name,
208                          self.network.name)
209         self.assertTrue(validate_network(
210             self.neutron, self.keystone,
211             self.net_config.network_settings.name, True,
212             self.os_creds.project_name))
213
214         with self.assertRaises(Exception):
215             SubnetConfig(cidr=self.net_config.subnet_cidr)
216
217     def test_create_subnet_empty_name(self):
218         """
219         Tests the neutron_utils.create_network() function with an empty
220         name
221         """
222         self.network = neutron_utils.create_network(
223             self.neutron, self.os_creds, self.net_config.network_settings)
224         self.assertEqual(self.net_config.network_settings.name,
225                          self.network.name)
226         self.assertTrue(validate_network(
227             self.neutron, self.keystone,
228             self.net_config.network_settings.name, True,
229             self.os_creds.project_name))
230
231         subnet_setting = self.net_config.network_settings.subnet_settings[0]
232         self.assertTrue(validate_subnet(
233             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
234         self.assertFalse(validate_subnet(
235             self.neutron, '', subnet_setting.cidr, True))
236
237         subnet_query1 = neutron_utils.get_subnet(
238             self.neutron, subnet_name=subnet_setting.name)
239         self.assertEqual(self.network.subnets[0], subnet_query1)
240
241         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
242                                                              self.network)
243         self.assertIsNotNone(subnet_query2)
244         self.assertEqual(1, len(subnet_query2))
245         self.assertEqual(self.network.subnets[0], subnet_query2[0])
246
247     def test_create_subnet_null_cidr(self):
248         """
249         Tests the neutron_utils.create_neutron_subnet() function for an
250         Exception when the subnet CIDR value is None
251         """
252         self.net_config.network_settings.subnet_settings[0].cidr = None
253         with self.assertRaises(Exception):
254             self.network = neutron_utils.create_network(
255                 self.neutron, self.os_creds, self.net_config.network_settings)
256
257     def test_create_subnet_empty_cidr(self):
258         """
259         Tests the neutron_utils.create_neutron_subnet() function for an
260         Exception when the subnet CIDR value is empty
261         """
262         self.net_config.network_settings.subnet_settings[0].cidr = ''
263         with self.assertRaises(Exception):
264             self.network = neutron_utils.create_network(
265                 self.neutron, self.os_creds, self.net_config.network_settings)
266
267
268 class NeutronUtilsIPv6Tests(OSComponentTestCase):
269     """
270     Test for creating IPv6 networks with subnets via neutron_utils.py
271     """
272
273     def setUp(self):
274         self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
275         self.neutron = neutron_utils.neutron_client(
276             self.os_creds, self.os_session)
277         self.network = None
278
279     def tearDown(self):
280         """
281         Cleans the remote OpenStack objects
282         """
283         if self.network:
284             try:
285                 neutron_utils.delete_network(self.neutron, self.network)
286             except:
287                 pass
288
289         super(self.__class__, self).__clean__()
290
291     def test_create_network_slaac(self):
292         """
293         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
294         is True and IPv6 modes are slaac
295         """
296         sub_setting = SubnetConfig(
297             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
298             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
299             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
300             enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
301         self.network_settings = NetworkConfig(
302             name=self.guid + '-net', subnet_settings=[sub_setting])
303
304         self.network = neutron_utils.create_network(
305             self.neutron, self.os_creds, self.network_settings)
306         self.assertEqual(self.network_settings.name, self.network.name)
307
308         subnet_settings = self.network_settings.subnet_settings[0]
309         self.assertEqual(1, len(self.network.subnets))
310         subnet = self.network.subnets[0]
311
312         self.assertEqual(self.network.id, subnet.network_id)
313         self.assertEqual(subnet_settings.name, subnet.name)
314         self.assertEqual(subnet_settings.start, subnet.start)
315         self.assertEqual(subnet_settings.end, subnet.end)
316         self.assertEqual('1:1::/64', subnet.cidr)
317         self.assertEqual(6, subnet.ip_version)
318         self.assertEqual(1, len(subnet.dns_nameservers))
319         self.assertEqual(
320             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
321         self.assertTrue(subnet.enable_dhcp)
322         self.assertEqual(
323             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
324         self.assertEqual(
325             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
326
327     def test_create_network_stateful(self):
328         """
329         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
330         is True and IPv6 modes are stateful
331         """
332         sub_setting = SubnetConfig(
333             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
334             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
335             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
336             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
337             ipv6_address_mode='dhcpv6-stateful')
338         self.network_settings = NetworkConfig(
339             name=self.guid + '-net', subnet_settings=[sub_setting])
340
341         self.network = neutron_utils.create_network(
342             self.neutron, self.os_creds, self.network_settings)
343
344         self.assertEqual(self.network_settings.name, self.network.name)
345
346         subnet_settings = self.network_settings.subnet_settings[0]
347         self.assertEqual(1, len(self.network.subnets))
348         subnet = self.network.subnets[0]
349
350         self.assertEqual(self.network.id, subnet.network_id)
351         self.assertEqual(subnet_settings.name, subnet.name)
352         self.assertEqual(subnet_settings.start, subnet.start)
353         self.assertEqual(subnet_settings.end, subnet.end)
354         self.assertEqual('1:1::/64', subnet.cidr)
355         self.assertEqual(6, subnet.ip_version)
356         self.assertEqual(1, len(subnet.dns_nameservers))
357         self.assertEqual(
358             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
359         self.assertTrue(subnet.enable_dhcp)
360         self.assertEqual(
361             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
362         self.assertEqual(
363             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
364
365     def test_create_network_stateless(self):
366         """
367         Tests the neutron_utils.create_network() when DHCP is enabled and
368         the RA and address modes are both 'slaac'
369         """
370         sub_setting = SubnetConfig(
371             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
372             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
373             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
374             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
375             ipv6_address_mode='dhcpv6-stateless')
376         self.network_settings = NetworkConfig(
377             name=self.guid + '-net', subnet_settings=[sub_setting])
378
379         self.network = neutron_utils.create_network(
380             self.neutron, self.os_creds, self.network_settings)
381
382         self.assertEqual(self.network_settings.name, self.network.name)
383
384         subnet_settings = self.network_settings.subnet_settings[0]
385         self.assertEqual(1, len(self.network.subnets))
386         subnet = self.network.subnets[0]
387
388         self.assertEqual(self.network.id, subnet.network_id)
389         self.assertEqual(subnet_settings.name, subnet.name)
390         self.assertEqual(subnet_settings.start, subnet.start)
391         self.assertEqual(subnet_settings.end, subnet.end)
392         self.assertEqual('1:1::/64', subnet.cidr)
393         self.assertEqual(6, subnet.ip_version)
394         self.assertEqual(1, len(subnet.dns_nameservers))
395         self.assertEqual(
396             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
397         self.assertTrue(subnet.enable_dhcp)
398         self.assertEqual(
399             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
400         self.assertEqual(
401             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
402
403     def test_create_network_no_dhcp_slaac(self):
404         """
405         Tests the neutron_utils.create_network() for a BadRequest when
406         DHCP is not enabled and the RA and address modes are both 'slaac'
407         """
408         sub_setting = SubnetConfig(
409             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
410             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
411             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
412             enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
413         self.network_settings = NetworkConfig(
414             name=self.guid + '-net', subnet_settings=[sub_setting])
415
416         with self.assertRaises(BadRequest):
417             self.network = neutron_utils.create_network(
418                 self.neutron, self.os_creds, self.network_settings)
419
420     def test_create_network_invalid_start_ip(self):
421         """
422         Tests the neutron_utils.create_network() that contains one IPv6 subnet
423         with an invalid start IP to ensure Neutron assigns it the smallest IP
424         possible
425         """
426         sub_setting = SubnetConfig(
427             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
428             start='foo')
429         self.network_settings = NetworkConfig(
430             name=self.guid + '-net', subnet_settings=[sub_setting])
431
432         self.network = neutron_utils.create_network(
433             self.neutron, self.os_creds, self.network_settings)
434
435         self.assertEqual('1:1::2', self.network.subnets[0].start)
436         self.assertEqual(
437             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
438
439     def test_create_network_invalid_end_ip(self):
440         """
441         Tests the neutron_utils.create_network() that contains one IPv6 subnet
442         with an invalid end IP to ensure Neutron assigns it the largest IP
443         possible
444         """
445         sub_setting = SubnetConfig(
446             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
447             end='bar')
448         self.network_settings = NetworkConfig(
449             name=self.guid + '-net', subnet_settings=[sub_setting])
450
451         self.network = neutron_utils.create_network(
452             self.neutron, self.os_creds, self.network_settings)
453
454         self.assertEqual('1:1::2', self.network.subnets[0].start)
455         self.assertEqual(
456             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
457
458     def test_create_network_with_bad_cidr(self):
459         """
460         Tests the neutron_utils.create_network() for a BadRequest when
461         the subnet CIDR is invalid
462         """
463         sub_setting = SubnetConfig(
464             name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
465         self.network_settings = NetworkConfig(
466             name=self.guid + '-net', subnet_settings=[sub_setting])
467
468         with self.assertRaises(BadRequest):
469             self.network = neutron_utils.create_network(
470                 self.neutron, self.os_creds, self.network_settings)
471
472     def test_create_network_invalid_gateway_ip(self):
473         """
474         Tests the neutron_utils.create_network() for a BadRequest when
475         the subnet gateway IP is invalid
476         """
477         sub_setting = SubnetConfig(
478             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
479             gateway_ip='192.168.0.1')
480         self.network_settings = NetworkConfig(
481             name=self.guid + '-net', subnet_settings=[sub_setting])
482
483         with self.assertRaises(BadRequest):
484             self.network = neutron_utils.create_network(
485                 self.neutron, self.os_creds, self.network_settings)
486
487     def test_create_network_with_bad_dns(self):
488         """
489         Tests the neutron_utils.create_network() for a BadRequest when
490         the DNS IP is invalid
491         """
492         sub_setting = SubnetConfig(
493             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
494             dns_nameservers=['foo'])
495         self.network_settings = NetworkConfig(
496             name=self.guid + '-net', subnet_settings=[sub_setting])
497
498         with self.assertRaises(BadRequest):
499             self.network = neutron_utils.create_network(
500                     self.neutron, self.os_creds, self.network_settings)
501
502
503 class NeutronUtilsRouterTests(OSComponentTestCase):
504     """
505     Test for creating routers via neutron_utils.py
506     """
507
508     def setUp(self):
509         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
510         self.port_name = str(guid) + '-port'
511         self.neutron = neutron_utils.neutron_client(
512             self.os_creds, self.os_session)
513         self.keystone = keystone_utils.keystone_client(
514             self.os_creds, self.os_session)
515         self.network = None
516         self.port = None
517         self.router = None
518         self.interface_router = None
519         self.net_config = openstack_tests.get_pub_net_config(
520             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
521             router_name=guid + '-pub-router', external_net=self.ext_net_name)
522
523     def tearDown(self):
524         """
525         Cleans the remote OpenStack objects
526         """
527         if self.interface_router:
528             neutron_utils.remove_interface_router(
529                 self.neutron, self.router, self.network.subnets[0])
530
531         if self.router:
532             try:
533                 neutron_utils.delete_router(self.neutron, self.router)
534                 validate_router(
535                     self.neutron, self.keystone, self.router.name,
536                     self.os_creds.project_name, False)
537             except:
538                 pass
539
540         if self.port:
541             try:
542                 neutron_utils.delete_port(self.neutron, self.port)
543             except:
544                 pass
545
546         if self.network:
547             neutron_utils.delete_network(self.neutron, self.network)
548
549         super(self.__class__, self).__clean__()
550
551     def test_create_router_simple(self):
552         """
553         Tests the neutron_utils.create_router()
554         """
555         self.router = neutron_utils.create_router(
556             self.neutron, self.os_creds, self.net_config.router_settings)
557         validate_router(
558             self.neutron, self.keystone, self.net_config.router_settings.name,
559             self.os_creds.project_name, True)
560
561     def test_create_router_with_public_interface(self):
562         """
563         Tests the neutron_utils.create_router() function with a pubic interface
564         """
565         subnet_setting = self.net_config.network_settings.subnet_settings[0]
566         self.net_config = openstack_tests.OSNetworkConfig(
567             self.net_config.network_settings.name, subnet_setting.name,
568             subnet_setting.cidr, self.net_config.router_settings.name,
569             self.ext_net_name)
570         self.router = neutron_utils.create_router(
571             self.neutron, self.os_creds, self.net_config.router_settings)
572         validate_router(
573             self.neutron, self.keystone, self.net_config.router_settings.name,
574             self.os_creds.project_name, True)
575
576         ext_net = neutron_utils.get_network(
577             self.neutron, self.keystone, network_name=self.ext_net_name)
578         self.assertEqual(self.router.external_network_id, ext_net.id)
579
580     def test_add_interface_router(self):
581         """
582         Tests the neutron_utils.add_interface_router() function
583         """
584         self.network = neutron_utils.create_network(
585             self.neutron, self.os_creds, self.net_config.network_settings)
586         self.assertEqual(self.net_config.network_settings.name,
587                          self.network.name)
588         self.assertTrue(validate_network(
589             self.neutron, self.keystone,
590             self.net_config.network_settings.name, True,
591             self.os_creds.project_name))
592
593         subnet_setting = self.net_config.network_settings.subnet_settings[0]
594         self.assertTrue(validate_subnet(
595             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
596
597         self.router = neutron_utils.create_router(
598             self.neutron, self.os_creds, self.net_config.router_settings)
599         validate_router(
600             self.neutron, self.keystone, self.net_config.router_settings.name,
601             self.os_creds.project_name, True)
602
603         self.interface_router = neutron_utils.add_interface_router(
604             self.neutron, self.router, self.network.subnets[0])
605         validate_interface_router(self.interface_router, self.router,
606                                   self.network.subnets[0])
607
608     def test_add_interface_router_null_router(self):
609         """
610         Tests the neutron_utils.add_interface_router() function for an
611         Exception when the router value is None
612         """
613         self.network = neutron_utils.create_network(
614             self.neutron, self.os_creds, self.net_config.network_settings)
615         self.assertEqual(self.net_config.network_settings.name,
616                          self.network.name)
617         self.assertTrue(validate_network(
618             self.neutron, self.keystone,
619             self.net_config.network_settings.name, True,
620             self.os_creds.project_name))
621
622         subnet_setting = self.net_config.network_settings.subnet_settings[0]
623         self.assertTrue(validate_subnet(
624             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
625
626         with self.assertRaises(NeutronException):
627             self.interface_router = neutron_utils.add_interface_router(
628                 self.neutron, self.router, self.network.subnets[0])
629
630     def test_add_interface_router_null_subnet(self):
631         """
632         Tests the neutron_utils.add_interface_router() function for an
633         Exception when the subnet value is None
634         """
635         self.network = neutron_utils.create_network(
636             self.neutron, self.os_creds, self.net_config.network_settings)
637         self.assertEqual(self.net_config.network_settings.name,
638                          self.network.name)
639         self.assertTrue(validate_network(
640             self.neutron, self.keystone,
641             self.net_config.network_settings.name, True,
642             self.os_creds.project_name))
643
644         self.router = neutron_utils.create_router(
645             self.neutron, self.os_creds, self.net_config.router_settings)
646         validate_router(
647             self.neutron, self.keystone, self.net_config.router_settings.name,
648             self.os_creds.project_name, True)
649
650         with self.assertRaises(NeutronException):
651             self.interface_router = neutron_utils.add_interface_router(
652                 self.neutron, self.router, None)
653
654     def test_add_interface_router_missing_subnet(self):
655         """
656         Tests the neutron_utils.add_interface_router() function for an
657         Exception when the subnet object has been deleted
658         """
659         self.network = neutron_utils.create_network(
660             self.neutron, self.os_creds, self.net_config.network_settings)
661         self.assertEqual(self.net_config.network_settings.name,
662                          self.network.name)
663         self.assertTrue(validate_network(
664             self.neutron, self.keystone,
665             self.net_config.network_settings.name, True,
666             self.os_creds.project_name))
667
668         self.router = neutron_utils.create_router(
669             self.neutron, self.os_creds, self.net_config.router_settings)
670         validate_router(
671             self.neutron, self.keystone, self.net_config.router_settings.name,
672             self.os_creds.project_name, True)
673
674         for subnet in self.network.subnets:
675             neutron_utils.delete_subnet(self.neutron, subnet)
676
677         with self.assertRaises(NotFound):
678             self.interface_router = neutron_utils.add_interface_router(
679                 self.neutron, self.router, self.network.subnets[0])
680
681     def test_create_port(self):
682         """
683         Tests the neutron_utils.create_port() function
684         """
685         self.network = neutron_utils.create_network(
686             self.neutron, self.os_creds, self.net_config.network_settings)
687         self.assertEqual(self.net_config.network_settings.name,
688                          self.network.name)
689         self.assertTrue(validate_network(
690             self.neutron, self.keystone,
691             self.net_config.network_settings.name, True,
692             self.os_creds.project_name))
693
694         subnet_setting = self.net_config.network_settings.subnet_settings[0]
695         self.assertTrue(validate_subnet(
696             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
697
698         self.port = neutron_utils.create_port(
699             self.neutron, self.os_creds, PortConfig(
700                 name=self.port_name,
701                 ip_addrs=[{
702                     'subnet_name': subnet_setting.name,
703                     'ip': ip_1}],
704                 network_name=self.net_config.network_settings.name))
705         validate_port(self.neutron, self.port, self.port_name)
706
707     def test_create_port_empty_name(self):
708         """
709         Tests the neutron_utils.create_port() function
710         """
711         self.network = neutron_utils.create_network(
712             self.neutron, self.os_creds, self.net_config.network_settings)
713         self.assertEqual(self.net_config.network_settings.name,
714                          self.network.name)
715         self.assertTrue(validate_network(
716             self.neutron, self.keystone,
717             self.net_config.network_settings.name, True,
718             self.os_creds.project_name))
719
720         subnet_setting = self.net_config.network_settings.subnet_settings[0]
721         self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
722                                         subnet_setting.cidr, True))
723
724         self.port = neutron_utils.create_port(
725             self.neutron, self.os_creds, PortConfig(
726                 name=self.port_name,
727                 network_name=self.net_config.network_settings.name,
728                 ip_addrs=[{
729                     'subnet_name': subnet_setting.name,
730                     'ip': ip_1}]))
731         validate_port(self.neutron, self.port, self.port_name)
732
733     def test_create_port_null_name(self):
734         """
735         Tests the neutron_utils.create_port() when the port name value is None
736         """
737         self.network = neutron_utils.create_network(
738             self.neutron, self.os_creds, self.net_config.network_settings)
739         self.assertEqual(self.net_config.network_settings.name,
740                          self.network.name)
741         self.assertTrue(validate_network(
742             self.neutron, self.keystone,
743             self.net_config.network_settings.name, True,
744             self.os_creds.project_name))
745
746         subnet_setting = self.net_config.network_settings.subnet_settings[0]
747         self.assertTrue(validate_subnet(
748             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
749
750         self.port = neutron_utils.create_port(
751             self.neutron, self.os_creds,
752             PortConfig(
753                 network_name=self.net_config.network_settings.name,
754                 ip_addrs=[{
755                     'subnet_name': subnet_setting.name,
756                     'ip': ip_1}]))
757
758         port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
759         self.assertEqual(self.port, port)
760
761     def test_create_port_null_network_object(self):
762         """
763         Tests the neutron_utils.create_port() function for an Exception when
764         the network object is None
765         """
766         with self.assertRaises(Exception):
767             self.port = neutron_utils.create_port(
768                 self.neutron, self.os_creds,
769                 PortConfig(
770                     name=self.port_name,
771                     network_name=self.net_config.network_settings.name,
772                     ip_addrs=[{
773                         'subnet_name':
774                             self.net_config.network_settings.subnet_settings[
775                                 0].name,
776                         'ip': ip_1}]))
777
778     def test_create_port_null_ip(self):
779         """
780         Tests the neutron_utils.create_port() function for an Exception when
781         the IP value is None
782         """
783         self.network = neutron_utils.create_network(
784             self.neutron, self.os_creds, self.net_config.network_settings)
785         self.assertEqual(self.net_config.network_settings.name,
786                          self.network.name)
787         self.assertTrue(validate_network(
788             self.neutron, self.keystone,
789             self.net_config.network_settings.name, True,
790             self.os_creds.project_name))
791
792         subnet_setting = self.net_config.network_settings.subnet_settings[0]
793         self.assertTrue(validate_subnet(
794             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
795
796         with self.assertRaises(Exception):
797             self.port = neutron_utils.create_port(
798                 self.neutron, self.os_creds,
799                 PortConfig(
800                     name=self.port_name,
801                     network_name=self.net_config.network_settings.name,
802                     ip_addrs=[{
803                         'subnet_name': subnet_setting.name,
804                         'ip': None}]))
805
806     def test_create_port_invalid_ip(self):
807         """
808         Tests the neutron_utils.create_port() function for an Exception when
809         the IP value is None
810         """
811         self.network = neutron_utils.create_network(
812             self.neutron, self.os_creds, self.net_config.network_settings)
813         self.assertEqual(self.net_config.network_settings.name,
814                          self.network.name)
815         self.assertTrue(validate_network(
816             self.neutron, self.keystone,
817             self.net_config.network_settings.name, True,
818             self.os_creds.project_name))
819
820         subnet_setting = self.net_config.network_settings.subnet_settings[0]
821         self.assertTrue(validate_subnet(
822             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
823
824         with self.assertRaises(Exception):
825             self.port = neutron_utils.create_port(
826                 self.neutron, self.os_creds,
827                 PortConfig(
828                     name=self.port_name,
829                     network_name=self.net_config.network_settings.name,
830                     ip_addrs=[{
831                         'subnet_name': subnet_setting.name,
832                         'ip': 'foo'}]))
833
834     def test_create_port_invalid_ip_to_subnet(self):
835         """
836         Tests the neutron_utils.create_port() function for an Exception when
837         the IP value is None
838         """
839         self.network = neutron_utils.create_network(
840             self.neutron, self.os_creds, self.net_config.network_settings)
841         self.assertEqual(self.net_config.network_settings.name,
842                          self.network.name)
843         self.assertTrue(validate_network(
844             self.neutron, self.keystone,
845             self.net_config.network_settings.name, True,
846             self.os_creds.project_name))
847
848         subnet_setting = self.net_config.network_settings.subnet_settings[0]
849         self.assertTrue(validate_subnet(
850             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
851
852         with self.assertRaises(Exception):
853             self.port = neutron_utils.create_port(
854                 self.neutron, self.os_creds,
855                 PortConfig(
856                     name=self.port_name,
857                     network_name=self.net_config.network_settings.name,
858                     ip_addrs=[{
859                         'subnet_name': subnet_setting.name,
860                         'ip': '10.197.123.100'}]))
861
862
863 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
864     """
865     Test for creating security groups via neutron_utils.py
866     """
867
868     def setUp(self):
869         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
870         self.sec_grp_name = guid + 'name'
871
872         self.security_groups = list()
873         self.security_group_rules = list()
874         self.neutron = neutron_utils.neutron_client(
875             self.os_creds, self.os_session)
876         self.keystone = keystone_utils.keystone_client(
877             self.os_creds, self.os_session)
878
879     def tearDown(self):
880         """
881         Cleans the remote OpenStack objects
882         """
883         for rule in self.security_group_rules:
884             neutron_utils.delete_security_group_rule(self.neutron, rule)
885
886         for security_group in self.security_groups:
887             try:
888                 neutron_utils.delete_security_group(self.neutron,
889                                                     security_group)
890             except:
891                 pass
892
893         super(self.__class__, self).__clean__()
894
895     def test_create_delete_simple_sec_grp(self):
896         """
897         Tests the neutron_utils.create_security_group() function
898         """
899         sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
900         security_group = neutron_utils.create_security_group(
901             self.neutron, self.keystone, sec_grp_settings)
902
903         self.assertTrue(sec_grp_settings.name, security_group.name)
904
905         sec_grp_get = neutron_utils.get_security_group(
906             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
907         self.assertIsNotNone(sec_grp_get)
908         self.assertTrue(validation_utils.objects_equivalent(
909             security_group, sec_grp_get))
910
911         neutron_utils.delete_security_group(self.neutron, security_group)
912         sec_grp_get = neutron_utils.get_security_group(
913             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
914         self.assertIsNone(sec_grp_get)
915
916     def test_create_sec_grp_no_name(self):
917         """
918         Tests the SecurityGroupConfig constructor and
919         neutron_utils.create_security_group() function to ensure that
920         attempting to create a security group without a name will raise an
921         exception
922         """
923         with self.assertRaises(Exception):
924             sec_grp_settings = SecurityGroupConfig()
925             self.security_groups.append(
926                 neutron_utils.create_security_group(
927                     self.neutron, self.keystone, sec_grp_settings))
928
929     def test_create_sec_grp_no_rules(self):
930         """
931         Tests the neutron_utils.create_security_group() function
932         """
933         sec_grp_settings = SecurityGroupConfig(
934             name=self.sec_grp_name, description='hello group')
935         self.security_groups.append(
936             neutron_utils.create_security_group(
937                 self.neutron, self.keystone, sec_grp_settings))
938
939         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
940         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
941
942         sec_grp_get = neutron_utils.get_security_group(
943             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
944         self.assertIsNotNone(sec_grp_get)
945         self.assertEqual(self.security_groups[0], sec_grp_get)
946
947     def test_create_sec_grp_one_rule(self):
948         """
949         Tests the neutron_utils.create_security_group() function
950         """
951
952         sec_grp_rule_settings = SecurityGroupRuleConfig(
953             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
954         sec_grp_settings = SecurityGroupConfig(
955             name=self.sec_grp_name, description='hello group',
956             rule_settings=[sec_grp_rule_settings])
957
958         self.security_groups.append(
959             neutron_utils.create_security_group(
960                 self.neutron, self.keystone, sec_grp_settings))
961         free_rules = neutron_utils.get_rules_by_security_group(
962             self.neutron, self.security_groups[0])
963         for free_rule in free_rules:
964             self.security_group_rules.append(free_rule)
965
966         keystone = keystone_utils.keystone_client(
967             self.os_creds, self.os_session)
968         self.security_group_rules.append(
969             neutron_utils.create_security_group_rule(
970                 self.neutron, keystone, sec_grp_settings.rule_settings[0],
971                 self.os_creds.project_name))
972
973         # Refresh object so it is populated with the newly added rule
974         security_group = neutron_utils.get_security_group(
975             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
976
977         rules = neutron_utils.get_rules_by_security_group(
978             self.neutron, security_group)
979
980         self.assertTrue(
981             validation_utils.objects_equivalent(
982                  self.security_group_rules, rules))
983
984         self.assertTrue(sec_grp_settings.name, security_group.name)
985
986         sec_grp_get = neutron_utils.get_security_group(
987             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
988         self.assertIsNotNone(sec_grp_get)
989         self.assertEqual(security_group, sec_grp_get)
990
991     def test_get_sec_grp_by_id(self):
992         """
993         Tests the neutron_utils.create_security_group() function
994         """
995
996         self.security_groups.append(neutron_utils.create_security_group(
997             self.neutron, self.keystone,
998             SecurityGroupConfig(
999                 name=self.sec_grp_name + '-1', description='hello group')))
1000         self.security_groups.append(neutron_utils.create_security_group(
1001             self.neutron, self.keystone,
1002             SecurityGroupConfig(
1003                 name=self.sec_grp_name + '-2', description='hello group')))
1004
1005         sec_grp_1b = neutron_utils.get_security_group_by_id(
1006             self.neutron, self.security_groups[0].id)
1007         sec_grp_2b = neutron_utils.get_security_group_by_id(
1008             self.neutron, self.security_groups[1].id)
1009
1010         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
1011         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
1012
1013
1014 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
1015     """
1016     Test basic nova keypair functionality
1017     """
1018
1019     def setUp(self):
1020         """
1021         Instantiates the CreateImage object that is responsible for downloading
1022         and creating an OS image file within OpenStack
1023         """
1024         self.neutron = neutron_utils.neutron_client(
1025             self.os_creds, self.os_session)
1026         self.keystone = keystone_utils.keystone_client(
1027             self.os_creds, self.os_session)
1028         self.floating_ip = None
1029
1030     def tearDown(self):
1031         """
1032         Cleans the image and downloaded image file
1033         """
1034         if self.floating_ip:
1035             try:
1036                 neutron_utils.delete_floating_ip(
1037                     self.neutron, self.floating_ip)
1038             except:
1039                 pass
1040
1041         super(self.__class__, self).__clean__()
1042
1043     def test_floating_ips(self):
1044         """
1045         Tests the creation of a floating IP
1046         :return:
1047         """
1048         initial_fips = neutron_utils.get_floating_ips(self.neutron)
1049
1050         self.floating_ip = neutron_utils.create_floating_ip(
1051             self.neutron, self.keystone, self.ext_net_name)
1052         all_fips = neutron_utils.get_floating_ips(self.neutron)
1053         self.assertEqual(len(initial_fips) + 1, len(all_fips))
1054         returned = neutron_utils.get_floating_ip(self.neutron,
1055                                                  self.floating_ip)
1056         self.assertEqual(self.floating_ip.id, returned.id)
1057         self.assertEqual(self.floating_ip.ip, returned.ip)
1058
1059
1060 """
1061 Validation routines
1062 """
1063
1064
1065 def validate_network(neutron, keystone, name, exists, project_name):
1066     """
1067     Returns true if a network for a given name DOES NOT exist if the exists
1068     parameter is false conversely true. Returns false if a network for a given
1069     name DOES exist if the exists parameter is true conversely false.
1070     :param neutron: The neutron client
1071     :param keystone: The keystone client
1072     :param name: The expected network name
1073     :param exists: Whether or not the network name should exist or not
1074     :param project_name: the associated project name
1075     :return: True/False
1076     """
1077     network = neutron_utils.get_network(
1078         neutron, keystone, network_name=name, project_name=project_name)
1079     if exists and network:
1080         return True
1081     if not exists and not network:
1082         return True
1083     return False
1084
1085
1086 def validate_subnet(neutron, name, cidr, exists):
1087     """
1088     Returns true if a subnet for a given name DOES NOT exist if the exists
1089     parameter is false conversely true. Returns false if a subnet for a given
1090     name DOES exist if the exists parameter is true conversely false.
1091     :param neutron: The neutron client
1092     :param name: The expected subnet name
1093     :param cidr: The expected CIDR value
1094     :param exists: Whether or not the network name should exist or not
1095     :return: True/False
1096     """
1097     subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
1098     if exists and subnet and subnet.name == name:
1099         return subnet.cidr == cidr
1100     if not exists and not subnet:
1101         return True
1102     return False
1103
1104
1105 def validate_router(neutron, keystone, name, project_name, exists):
1106     """
1107     Returns true if a router for a given name DOES NOT exist if the exists
1108     parameter is false conversely true. Returns false if a router for a given
1109     name DOES exist if the exists parameter is true conversely false.
1110     :param neutron: The neutron client
1111     :param keystone: The keystone client
1112     :param name: The expected router name
1113     :param project_name: The name of the project in which the router should
1114                          exist
1115     :param exists: Whether or not the network name should exist or not
1116     :return: True/False
1117     """
1118     router = neutron_utils.get_router(
1119         neutron, keystone, router_name=name, project_name=project_name)
1120     if exists and router:
1121         return True
1122     return False
1123
1124
1125 def validate_interface_router(interface_router, router, subnet):
1126     """
1127     Returns true if the router ID & subnet ID have been properly included into
1128     the interface router object
1129     :param interface_router: the SNAPS-OO InterfaceRouter domain object
1130     :param router: to validate against the interface_router
1131     :param subnet: to validate against the interface_router
1132     :return: True if both IDs match else False
1133     """
1134     subnet_id = interface_router.subnet_id
1135     router_id = interface_router.port_id
1136
1137     return subnet.id == subnet_id and router.id == router_id
1138
1139
1140 def validate_port(neutron, port_obj, this_port_name):
1141     """
1142     Returns true if a port for a given name DOES NOT exist if the exists
1143     parameter is false conversely true. Returns false if a port for a given
1144     name DOES exist if the exists parameter is true conversely false.
1145     :param neutron: The neutron client
1146     :param port_obj: The port object to lookup
1147     :param this_port_name: The expected router name
1148     :return: True/False
1149     """
1150     os_ports = neutron.list_ports()
1151     for os_port, os_port_insts in os_ports.items():
1152         for os_inst in os_port_insts:
1153             if os_inst['id'] == port_obj.id:
1154                 return os_inst['name'] == this_port_name
1155     return False