Added MTU network config and updated flavor_metadata test support
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from neutronclient.common.exceptions import NotFound, BadRequest
18
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.config.security_group import (
21     SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(
92             self.os_creds, self.os_session)
93         self.keystone = keystone_utils.keystone_client(
94             self.os_creds, self.os_session)
95         self.network = None
96         self.net_config = openstack_tests.get_pub_net_config(
97             project_name=self.os_creds.project_name,
98             net_name=guid + '-pub-net')
99
100     def tearDown(self):
101         """
102         Cleans the remote OpenStack objects
103         """
104         if self.network:
105             neutron_utils.delete_network(self.neutron, self.network)
106
107         super(self.__class__, self).__clean__()
108
109     def test_create_network(self):
110         """
111         Tests the neutron_utils.create_network() function
112         """
113         self.network = neutron_utils.create_network(
114             self.neutron, self.os_creds, self.net_config.network_settings)
115         self.assertEqual(self.net_config.network_settings.name,
116                          self.network.name)
117         self.assertTrue(validate_network(
118             self.neutron, self.keystone,
119             self.net_config.network_settings.name, True,
120             self.os_creds.project_name))
121         self.assertEqual(len(self.net_config.network_settings.subnet_settings),
122                          len(self.network.subnets))
123
124     def test_create_network_empty_name(self):
125         """
126         Tests the neutron_utils.create_network() function with an empty
127         network name
128         """
129         with self.assertRaises(Exception):
130             self.network = neutron_utils.create_network(
131                 self.neutron, self.os_creds,
132                 network_settings=NetworkConfig(name=''))
133
134     def test_create_network_null_name(self):
135         """
136         Tests the neutron_utils.create_network() function when the network
137         name is None
138         """
139         with self.assertRaises(Exception):
140             self.network = neutron_utils.create_network(
141                 self.neutron, self.os_creds,
142                 network_settings=NetworkConfig())
143
144
145 class NeutronUtilsSubnetTests(OSComponentTestCase):
146     """
147     Test for creating networks with subnets via neutron_utils.py
148     """
149
150     def setUp(self):
151         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
152         self.port_name = str(guid) + '-port'
153         self.neutron = neutron_utils.neutron_client(
154             self.os_creds, self.os_session)
155         self.keystone = keystone_utils.keystone_client(
156             self.os_creds, self.os_session)
157         self.network = None
158         self.net_config = openstack_tests.get_pub_net_config(
159             project_name=self.os_creds.project_name,
160             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
161             external_net=self.ext_net_name)
162
163     def tearDown(self):
164         """
165         Cleans the remote OpenStack objects
166         """
167         if self.network:
168             try:
169                 neutron_utils.delete_network(self.neutron, self.network)
170             except:
171                 pass
172
173         super(self.__class__, self).__clean__()
174
175     def test_create_subnet(self):
176         """
177         Tests the neutron_utils.create_network() function
178         """
179         self.network = neutron_utils.create_network(
180             self.neutron, self.os_creds, self.net_config.network_settings)
181         self.assertEqual(self.net_config.network_settings.name,
182                          self.network.name)
183         self.assertTrue(validate_network(
184             self.neutron, self.keystone,
185             self.net_config.network_settings.name, True,
186             self.os_creds.project_name))
187
188         subnet_setting = self.net_config.network_settings.subnet_settings[0]
189         self.assertTrue(validate_subnet(
190             self.neutron, self.network, subnet_setting.name,
191             subnet_setting.cidr, True))
192
193         subnet_query1 = neutron_utils.get_subnet(
194             self.neutron, self.network, subnet_name=subnet_setting.name)
195         self.assertEqual(self.network.subnets[0], subnet_query1)
196
197         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
198                                                              self.network)
199         self.assertIsNotNone(subnet_query2)
200         self.assertEqual(1, len(subnet_query2))
201         self.assertEqual(self.network.subnets[0], subnet_query2[0])
202
203         subnet_query3 = neutron_utils.get_subnet_by_name(
204             self.neutron, self.keystone, subnet_setting.name,
205             self.os_creds.project_name)
206         self.assertIsNotNone(subnet_query3)
207         self.assertEqual(self.network.subnets[0], subnet_query3)
208
209     def test_create_subnet_null_name(self):
210         """
211         Tests the neutron_utils.create_neutron_subnet() function for an
212         Exception when the subnet name is None
213         """
214         self.network = neutron_utils.create_network(
215             self.neutron, self.os_creds, self.net_config.network_settings)
216         self.assertEqual(self.net_config.network_settings.name,
217                          self.network.name)
218         self.assertTrue(validate_network(
219             self.neutron, self.keystone,
220             self.net_config.network_settings.name, True,
221             self.os_creds.project_name))
222
223         with self.assertRaises(Exception):
224             SubnetConfig(cidr=self.net_config.subnet_cidr)
225
226     def test_create_subnet_empty_name(self):
227         """
228         Tests the neutron_utils.create_network() function with an empty
229         name
230         """
231         self.network = neutron_utils.create_network(
232             self.neutron, self.os_creds, self.net_config.network_settings)
233         self.assertEqual(self.net_config.network_settings.name,
234                          self.network.name)
235         self.assertTrue(validate_network(
236             self.neutron, self.keystone,
237             self.net_config.network_settings.name, True,
238             self.os_creds.project_name))
239
240         subnet_setting = self.net_config.network_settings.subnet_settings[0]
241         self.assertTrue(validate_subnet(
242             self.neutron, self.network, subnet_setting.name,
243             subnet_setting.cidr, True))
244         self.assertFalse(validate_subnet(
245             self.neutron, self.network, '', subnet_setting.cidr, True))
246
247         subnet_query1 = neutron_utils.get_subnet(
248             self.neutron, self.network, subnet_name=subnet_setting.name)
249         self.assertEqual(self.network.subnets[0], subnet_query1)
250
251         subnet_query2 = neutron_utils.get_subnets_by_network(
252             self.neutron, self.network)
253         self.assertIsNotNone(subnet_query2)
254         self.assertEqual(1, len(subnet_query2))
255         self.assertEqual(self.network.subnets[0], subnet_query2[0])
256
257     def test_create_subnet_null_cidr(self):
258         """
259         Tests the neutron_utils.create_neutron_subnet() function for an
260         Exception when the subnet CIDR value is None
261         """
262         self.net_config.network_settings.subnet_settings[0].cidr = None
263         with self.assertRaises(Exception):
264             self.network = neutron_utils.create_network(
265                 self.neutron, self.os_creds, self.net_config.network_settings)
266
267     def test_create_subnet_empty_cidr(self):
268         """
269         Tests the neutron_utils.create_neutron_subnet() function for an
270         Exception when the subnet CIDR value is empty
271         """
272         self.net_config.network_settings.subnet_settings[0].cidr = ''
273         with self.assertRaises(Exception):
274             self.network = neutron_utils.create_network(
275                 self.neutron, self.os_creds, self.net_config.network_settings)
276
277
278 class NeutronUtilsIPv6Tests(OSComponentTestCase):
279     """
280     Test for creating IPv6 networks with subnets via neutron_utils.py
281     """
282
283     def setUp(self):
284         self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
285         self.neutron = neutron_utils.neutron_client(
286             self.os_creds, self.os_session)
287         self.network = None
288
289     def tearDown(self):
290         """
291         Cleans the remote OpenStack objects
292         """
293         if self.network:
294             try:
295                 neutron_utils.delete_network(self.neutron, self.network)
296             except:
297                 pass
298
299         super(self.__class__, self).__clean__()
300
301     def test_create_network_slaac(self):
302         """
303         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
304         is True and IPv6 modes are slaac
305         """
306         sub_setting = SubnetConfig(
307             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
308             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
309             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
310             enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
311         self.network_settings = NetworkConfig(
312             name=self.guid + '-net', subnet_settings=[sub_setting])
313
314         self.network = neutron_utils.create_network(
315             self.neutron, self.os_creds, self.network_settings)
316         self.assertEqual(self.network_settings.name, self.network.name)
317
318         subnet_settings = self.network_settings.subnet_settings[0]
319         self.assertEqual(1, len(self.network.subnets))
320         subnet = self.network.subnets[0]
321
322         self.assertEqual(self.network.id, subnet.network_id)
323         self.assertEqual(subnet_settings.name, subnet.name)
324         self.assertEqual(subnet_settings.start, subnet.start)
325         self.assertEqual(subnet_settings.end, subnet.end)
326         self.assertEqual('1:1::/64', subnet.cidr)
327         self.assertEqual(6, subnet.ip_version)
328         self.assertEqual(1, len(subnet.dns_nameservers))
329         self.assertEqual(
330             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
331         self.assertTrue(subnet.enable_dhcp)
332         self.assertEqual(
333             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
334         self.assertEqual(
335             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
336
337     def test_create_network_stateful(self):
338         """
339         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
340         is True and IPv6 modes are stateful
341         """
342         sub_setting = SubnetConfig(
343             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
344             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
345             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
346             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
347             ipv6_address_mode='dhcpv6-stateful')
348         self.network_settings = NetworkConfig(
349             name=self.guid + '-net', subnet_settings=[sub_setting])
350
351         self.network = neutron_utils.create_network(
352             self.neutron, self.os_creds, self.network_settings)
353
354         self.assertEqual(self.network_settings.name, self.network.name)
355
356         subnet_settings = self.network_settings.subnet_settings[0]
357         self.assertEqual(1, len(self.network.subnets))
358         subnet = self.network.subnets[0]
359
360         self.assertEqual(self.network.id, subnet.network_id)
361         self.assertEqual(subnet_settings.name, subnet.name)
362         self.assertEqual(subnet_settings.start, subnet.start)
363         self.assertEqual(subnet_settings.end, subnet.end)
364         self.assertEqual('1:1::/64', subnet.cidr)
365         self.assertEqual(6, subnet.ip_version)
366         self.assertEqual(1, len(subnet.dns_nameservers))
367         self.assertEqual(
368             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
369         self.assertTrue(subnet.enable_dhcp)
370         self.assertEqual(
371             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
372         self.assertEqual(
373             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
374
375     def test_create_network_stateless(self):
376         """
377         Tests the neutron_utils.create_network() when DHCP is enabled and
378         the RA and address modes are both 'slaac'
379         """
380         sub_setting = SubnetConfig(
381             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
382             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
383             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
384             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
385             ipv6_address_mode='dhcpv6-stateless')
386         self.network_settings = NetworkConfig(
387             name=self.guid + '-net', subnet_settings=[sub_setting])
388
389         self.network = neutron_utils.create_network(
390             self.neutron, self.os_creds, self.network_settings)
391
392         self.assertEqual(self.network_settings.name, self.network.name)
393
394         subnet_settings = self.network_settings.subnet_settings[0]
395         self.assertEqual(1, len(self.network.subnets))
396         subnet = self.network.subnets[0]
397
398         self.assertEqual(self.network.id, subnet.network_id)
399         self.assertEqual(subnet_settings.name, subnet.name)
400         self.assertEqual(subnet_settings.start, subnet.start)
401         self.assertEqual(subnet_settings.end, subnet.end)
402         self.assertEqual('1:1::/64', subnet.cidr)
403         self.assertEqual(6, subnet.ip_version)
404         self.assertEqual(1, len(subnet.dns_nameservers))
405         self.assertEqual(
406             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
407         self.assertTrue(subnet.enable_dhcp)
408         self.assertEqual(
409             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
410         self.assertEqual(
411             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
412
413     def test_create_network_no_dhcp_slaac(self):
414         """
415         Tests the neutron_utils.create_network() for a BadRequest when
416         DHCP is not enabled and the RA and address modes are both 'slaac'
417         """
418         sub_setting = SubnetConfig(
419             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
420             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
421             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
422             enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
423         self.network_settings = NetworkConfig(
424             name=self.guid + '-net', subnet_settings=[sub_setting])
425
426         with self.assertRaises(BadRequest):
427             self.network = neutron_utils.create_network(
428                 self.neutron, self.os_creds, self.network_settings)
429
430     def test_create_network_invalid_start_ip(self):
431         """
432         Tests the neutron_utils.create_network() that contains one IPv6 subnet
433         with an invalid start IP to ensure Neutron assigns it the smallest IP
434         possible
435         """
436         sub_setting = SubnetConfig(
437             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
438             start='foo')
439         self.network_settings = NetworkConfig(
440             name=self.guid + '-net', subnet_settings=[sub_setting])
441
442         self.network = neutron_utils.create_network(
443             self.neutron, self.os_creds, self.network_settings)
444
445         self.assertEqual('1:1::2', self.network.subnets[0].start)
446         self.assertEqual(
447             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
448
449     def test_create_network_invalid_end_ip(self):
450         """
451         Tests the neutron_utils.create_network() that contains one IPv6 subnet
452         with an invalid end IP to ensure Neutron assigns it the largest IP
453         possible
454         """
455         sub_setting = SubnetConfig(
456             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
457             end='bar')
458         self.network_settings = NetworkConfig(
459             name=self.guid + '-net', subnet_settings=[sub_setting])
460
461         self.network = neutron_utils.create_network(
462             self.neutron, self.os_creds, self.network_settings)
463
464         self.assertEqual('1:1::2', self.network.subnets[0].start)
465         self.assertEqual(
466             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
467
468     def test_create_network_with_bad_cidr(self):
469         """
470         Tests the neutron_utils.create_network() for a BadRequest when
471         the subnet CIDR is invalid
472         """
473         sub_setting = SubnetConfig(
474             name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
475         self.network_settings = NetworkConfig(
476             name=self.guid + '-net', subnet_settings=[sub_setting])
477
478         with self.assertRaises(BadRequest):
479             self.network = neutron_utils.create_network(
480                 self.neutron, self.os_creds, self.network_settings)
481
482     def test_create_network_invalid_gateway_ip(self):
483         """
484         Tests the neutron_utils.create_network() for a BadRequest when
485         the subnet gateway IP is invalid
486         """
487         sub_setting = SubnetConfig(
488             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
489             gateway_ip='192.168.0.1')
490         self.network_settings = NetworkConfig(
491             name=self.guid + '-net', subnet_settings=[sub_setting])
492
493         with self.assertRaises(BadRequest):
494             self.network = neutron_utils.create_network(
495                 self.neutron, self.os_creds, self.network_settings)
496
497     def test_create_network_with_bad_dns(self):
498         """
499         Tests the neutron_utils.create_network() for a BadRequest when
500         the DNS IP is invalid
501         """
502         sub_setting = SubnetConfig(
503             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
504             dns_nameservers=['foo'])
505         self.network_settings = NetworkConfig(
506             name=self.guid + '-net', subnet_settings=[sub_setting])
507
508         with self.assertRaises(BadRequest):
509             self.network = neutron_utils.create_network(
510                     self.neutron, self.os_creds, self.network_settings)
511
512
513 class NeutronUtilsRouterTests(OSComponentTestCase):
514     """
515     Test for creating routers via neutron_utils.py
516     """
517
518     def setUp(self):
519         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
520         self.port_name = str(guid) + '-port'
521         self.neutron = neutron_utils.neutron_client(
522             self.os_creds, self.os_session)
523         self.keystone = keystone_utils.keystone_client(
524             self.os_creds, self.os_session)
525         self.network = None
526         self.port = None
527         self.router = None
528         self.interface_router = None
529         self.net_config = openstack_tests.get_pub_net_config(
530             project_name=self.os_creds.project_name,
531             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
532             router_name=guid + '-pub-router', external_net=self.ext_net_name)
533
534     def tearDown(self):
535         """
536         Cleans the remote OpenStack objects
537         """
538         if self.interface_router:
539             neutron_utils.remove_interface_router(
540                 self.neutron, self.router, self.network.subnets[0])
541
542         if self.router:
543             try:
544                 neutron_utils.delete_router(self.neutron, self.router)
545                 validate_router(
546                     self.neutron, self.keystone, self.router.name,
547                     self.os_creds.project_name, False)
548             except:
549                 pass
550
551         if self.port:
552             try:
553                 neutron_utils.delete_port(self.neutron, self.port)
554             except:
555                 pass
556
557         if self.network:
558             neutron_utils.delete_network(self.neutron, self.network)
559
560         super(self.__class__, self).__clean__()
561
562     def test_create_router_simple(self):
563         """
564         Tests the neutron_utils.create_router()
565         """
566         self.router = neutron_utils.create_router(
567             self.neutron, self.os_creds, self.net_config.router_settings)
568         validate_router(
569             self.neutron, self.keystone, self.net_config.router_settings.name,
570             self.os_creds.project_name, True)
571
572     def test_create_router_with_public_interface(self):
573         """
574         Tests the neutron_utils.create_router() function with a pubic interface
575         """
576         subnet_setting = self.net_config.network_settings.subnet_settings[0]
577         self.net_config = openstack_tests.OSNetworkConfig(
578             project_name=self.os_creds.project_name,
579             net_name=self.net_config.network_settings.name,
580             subnet_name=subnet_setting.name, subnet_cidr=subnet_setting.cidr,
581             router_name=self.net_config.router_settings.name,
582             external_gateway=self.ext_net_name)
583         self.router = neutron_utils.create_router(
584             self.neutron, self.os_creds, self.net_config.router_settings)
585         validate_router(
586             self.neutron, self.keystone, self.net_config.router_settings.name,
587             self.os_creds.project_name, True)
588
589         ext_net = neutron_utils.get_network(
590             self.neutron, self.keystone, network_name=self.ext_net_name)
591         self.assertEqual(self.router.external_network_id, ext_net.id)
592
593     def test_add_interface_router(self):
594         """
595         Tests the neutron_utils.add_interface_router() function
596         """
597         self.network = neutron_utils.create_network(
598             self.neutron, self.os_creds, self.net_config.network_settings)
599         self.assertEqual(self.net_config.network_settings.name,
600                          self.network.name)
601         self.assertTrue(validate_network(
602             self.neutron, self.keystone,
603             self.net_config.network_settings.name, True,
604             self.os_creds.project_name))
605
606         subnet_setting = self.net_config.network_settings.subnet_settings[0]
607         self.assertTrue(validate_subnet(
608             self.neutron, self.network, subnet_setting.name,
609             subnet_setting.cidr, True))
610
611         self.router = neutron_utils.create_router(
612             self.neutron, self.os_creds, self.net_config.router_settings)
613         validate_router(
614             self.neutron, self.keystone, self.net_config.router_settings.name,
615             self.os_creds.project_name, True)
616
617         self.interface_router = neutron_utils.add_interface_router(
618             self.neutron, self.router, self.network.subnets[0])
619         validate_interface_router(self.interface_router, self.router,
620                                   self.network.subnets[0])
621
622     def test_add_interface_router_null_router(self):
623         """
624         Tests the neutron_utils.add_interface_router() function for an
625         Exception when the router value is None
626         """
627         self.network = neutron_utils.create_network(
628             self.neutron, self.os_creds, self.net_config.network_settings)
629         self.assertEqual(self.net_config.network_settings.name,
630                          self.network.name)
631         self.assertTrue(validate_network(
632             self.neutron, self.keystone,
633             self.net_config.network_settings.name, True,
634             self.os_creds.project_name))
635
636         subnet_setting = self.net_config.network_settings.subnet_settings[0]
637         self.assertTrue(validate_subnet(
638             self.neutron, self.network, subnet_setting.name,
639             subnet_setting.cidr, True))
640
641         with self.assertRaises(NeutronException):
642             self.interface_router = neutron_utils.add_interface_router(
643                 self.neutron, self.router, self.network.subnets[0])
644
645     def test_add_interface_router_null_subnet(self):
646         """
647         Tests the neutron_utils.add_interface_router() function for an
648         Exception when the subnet value is None
649         """
650         self.network = neutron_utils.create_network(
651             self.neutron, self.os_creds, self.net_config.network_settings)
652         self.assertEqual(self.net_config.network_settings.name,
653                          self.network.name)
654         self.assertTrue(validate_network(
655             self.neutron, self.keystone,
656             self.net_config.network_settings.name, True,
657             self.os_creds.project_name))
658
659         self.router = neutron_utils.create_router(
660             self.neutron, self.os_creds, self.net_config.router_settings)
661         validate_router(
662             self.neutron, self.keystone, self.net_config.router_settings.name,
663             self.os_creds.project_name, True)
664
665         with self.assertRaises(NeutronException):
666             self.interface_router = neutron_utils.add_interface_router(
667                 self.neutron, self.router, None)
668
669     def test_add_interface_router_missing_subnet(self):
670         """
671         Tests the neutron_utils.add_interface_router() function for an
672         Exception when the subnet object has been deleted
673         """
674         self.network = neutron_utils.create_network(
675             self.neutron, self.os_creds, self.net_config.network_settings)
676         self.assertEqual(self.net_config.network_settings.name,
677                          self.network.name)
678         self.assertTrue(validate_network(
679             self.neutron, self.keystone,
680             self.net_config.network_settings.name, True,
681             self.os_creds.project_name))
682
683         self.router = neutron_utils.create_router(
684             self.neutron, self.os_creds, self.net_config.router_settings)
685         validate_router(
686             self.neutron, self.keystone, self.net_config.router_settings.name,
687             self.os_creds.project_name, True)
688
689         for subnet in self.network.subnets:
690             neutron_utils.delete_subnet(self.neutron, subnet)
691
692         with self.assertRaises(NotFound):
693             self.interface_router = neutron_utils.add_interface_router(
694                 self.neutron, self.router, self.network.subnets[0])
695
696     def test_create_port(self):
697         """
698         Tests the neutron_utils.create_port() function
699         """
700         self.network = neutron_utils.create_network(
701             self.neutron, self.os_creds, self.net_config.network_settings)
702         self.assertEqual(self.net_config.network_settings.name,
703                          self.network.name)
704         self.assertTrue(validate_network(
705             self.neutron, self.keystone,
706             self.net_config.network_settings.name, True,
707             self.os_creds.project_name))
708
709         subnet_setting = self.net_config.network_settings.subnet_settings[0]
710         self.assertTrue(validate_subnet(
711             self.neutron, self.network, subnet_setting.name,
712             subnet_setting.cidr, True))
713
714         self.port = neutron_utils.create_port(
715             self.neutron, self.os_creds, PortConfig(
716                 name=self.port_name,
717                 ip_addrs=[{
718                     'subnet_name': subnet_setting.name,
719                     'ip': ip_1}],
720                 network_name=self.net_config.network_settings.name))
721         validate_port(self.neutron, self.port, self.port_name)
722
723     def test_create_port_empty_name(self):
724         """
725         Tests the neutron_utils.create_port() function
726         """
727         self.network = neutron_utils.create_network(
728             self.neutron, self.os_creds, self.net_config.network_settings)
729         self.assertEqual(self.net_config.network_settings.name,
730                          self.network.name)
731         self.assertTrue(validate_network(
732             self.neutron, self.keystone,
733             self.net_config.network_settings.name, True,
734             self.os_creds.project_name))
735
736         subnet_setting = self.net_config.network_settings.subnet_settings[0]
737         self.assertTrue(validate_subnet(
738             self.neutron, self.network, subnet_setting.name,
739             subnet_setting.cidr, True))
740
741         self.port = neutron_utils.create_port(
742             self.neutron, self.os_creds, PortConfig(
743                 name=self.port_name,
744                 network_name=self.net_config.network_settings.name,
745                 ip_addrs=[{
746                     'subnet_name': subnet_setting.name,
747                     'ip': ip_1}]))
748         validate_port(self.neutron, self.port, self.port_name)
749
750     def test_create_port_null_name(self):
751         """
752         Tests the neutron_utils.create_port() when the port name value is None
753         """
754         self.network = neutron_utils.create_network(
755             self.neutron, self.os_creds, self.net_config.network_settings)
756         self.assertEqual(self.net_config.network_settings.name,
757                          self.network.name)
758         self.assertTrue(validate_network(
759             self.neutron, self.keystone,
760             self.net_config.network_settings.name, True,
761             self.os_creds.project_name))
762
763         subnet_setting = self.net_config.network_settings.subnet_settings[0]
764         self.assertTrue(validate_subnet(
765             self.neutron, self.network, subnet_setting.name,
766             subnet_setting.cidr, True))
767
768         self.port = neutron_utils.create_port(
769             self.neutron, self.os_creds,
770             PortConfig(
771                 network_name=self.net_config.network_settings.name,
772                 ip_addrs=[{
773                     'subnet_name': subnet_setting.name,
774                     'ip': ip_1}]))
775
776         port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
777         self.assertEqual(self.port, port)
778
779     def test_create_port_null_network_object(self):
780         """
781         Tests the neutron_utils.create_port() function for an Exception when
782         the network object is None
783         """
784         with self.assertRaises(Exception):
785             self.port = neutron_utils.create_port(
786                 self.neutron, self.os_creds,
787                 PortConfig(
788                     name=self.port_name,
789                     network_name=self.net_config.network_settings.name,
790                     ip_addrs=[{
791                         'subnet_name':
792                             self.net_config.network_settings.subnet_settings[
793                                 0].name,
794                         'ip': ip_1}]))
795
796     def test_create_port_null_ip(self):
797         """
798         Tests the neutron_utils.create_port() function for an Exception when
799         the IP value is None
800         """
801         self.network = neutron_utils.create_network(
802             self.neutron, self.os_creds, self.net_config.network_settings)
803         self.assertEqual(self.net_config.network_settings.name,
804                          self.network.name)
805         self.assertTrue(validate_network(
806             self.neutron, self.keystone,
807             self.net_config.network_settings.name, True,
808             self.os_creds.project_name))
809
810         subnet_setting = self.net_config.network_settings.subnet_settings[0]
811         self.assertTrue(validate_subnet(
812             self.neutron, self.network, subnet_setting.name,
813             subnet_setting.cidr, True))
814
815         with self.assertRaises(Exception):
816             self.port = neutron_utils.create_port(
817                 self.neutron, self.os_creds,
818                 PortConfig(
819                     name=self.port_name,
820                     network_name=self.net_config.network_settings.name,
821                     ip_addrs=[{
822                         'subnet_name': subnet_setting.name,
823                         'ip': None}]))
824
825     def test_create_port_invalid_ip(self):
826         """
827         Tests the neutron_utils.create_port() function for an Exception when
828         the IP value is None
829         """
830         self.network = neutron_utils.create_network(
831             self.neutron, self.os_creds, self.net_config.network_settings)
832         self.assertEqual(self.net_config.network_settings.name,
833                          self.network.name)
834         self.assertTrue(validate_network(
835             self.neutron, self.keystone,
836             self.net_config.network_settings.name, True,
837             self.os_creds.project_name))
838
839         subnet_setting = self.net_config.network_settings.subnet_settings[0]
840         self.assertTrue(validate_subnet(
841             self.neutron, self.network, subnet_setting.name,
842             subnet_setting.cidr, True))
843
844         with self.assertRaises(Exception):
845             self.port = neutron_utils.create_port(
846                 self.neutron, self.os_creds,
847                 PortConfig(
848                     name=self.port_name,
849                     network_name=self.net_config.network_settings.name,
850                     ip_addrs=[{
851                         'subnet_name': subnet_setting.name,
852                         'ip': 'foo'}]))
853
854     def test_create_port_invalid_ip_to_subnet(self):
855         """
856         Tests the neutron_utils.create_port() function for an Exception when
857         the IP value is None
858         """
859         self.network = neutron_utils.create_network(
860             self.neutron, self.os_creds, self.net_config.network_settings)
861         self.assertEqual(self.net_config.network_settings.name,
862                          self.network.name)
863         self.assertTrue(validate_network(
864             self.neutron, self.keystone,
865             self.net_config.network_settings.name, True,
866             self.os_creds.project_name))
867
868         subnet_setting = self.net_config.network_settings.subnet_settings[0]
869         self.assertTrue(validate_subnet(
870             self.neutron, self.network, subnet_setting.name,
871             subnet_setting.cidr, True))
872
873         with self.assertRaises(Exception):
874             self.port = neutron_utils.create_port(
875                 self.neutron, self.os_creds,
876                 PortConfig(
877                     name=self.port_name,
878                     network_name=self.net_config.network_settings.name,
879                     ip_addrs=[{
880                         'subnet_name': subnet_setting.name,
881                         'ip': '10.197.123.100'}]))
882
883
884 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
885     """
886     Test for creating security groups via neutron_utils.py
887     """
888
889     def setUp(self):
890         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
891         self.sec_grp_name = guid + 'name'
892
893         self.security_groups = list()
894         self.security_group_rules = list()
895         self.neutron = neutron_utils.neutron_client(
896             self.os_creds, self.os_session)
897         self.keystone = keystone_utils.keystone_client(
898             self.os_creds, self.os_session)
899
900     def tearDown(self):
901         """
902         Cleans the remote OpenStack objects
903         """
904         for rule in self.security_group_rules:
905             neutron_utils.delete_security_group_rule(self.neutron, rule)
906
907         for security_group in self.security_groups:
908             try:
909                 neutron_utils.delete_security_group(self.neutron,
910                                                     security_group)
911             except:
912                 pass
913
914         super(self.__class__, self).__clean__()
915
916     def test_create_delete_simple_sec_grp(self):
917         """
918         Tests the neutron_utils.create_security_group() function
919         """
920         sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
921         security_group = neutron_utils.create_security_group(
922             self.neutron, self.keystone, sec_grp_settings)
923
924         self.assertTrue(sec_grp_settings.name, security_group.name)
925
926         sec_grp_get = neutron_utils.get_security_group(
927             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
928         self.assertIsNotNone(sec_grp_get)
929         self.assertTrue(validation_utils.objects_equivalent(
930             security_group, sec_grp_get))
931
932         neutron_utils.delete_security_group(self.neutron, security_group)
933         sec_grp_get = neutron_utils.get_security_group(
934             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
935         self.assertIsNone(sec_grp_get)
936
937     def test_create_sec_grp_no_name(self):
938         """
939         Tests the SecurityGroupConfig constructor and
940         neutron_utils.create_security_group() function to ensure that
941         attempting to create a security group without a name will raise an
942         exception
943         """
944         with self.assertRaises(Exception):
945             sec_grp_settings = SecurityGroupConfig()
946             self.security_groups.append(
947                 neutron_utils.create_security_group(
948                     self.neutron, self.keystone, sec_grp_settings))
949
950     def test_create_sec_grp_no_rules(self):
951         """
952         Tests the neutron_utils.create_security_group() function
953         """
954         sec_grp_settings = SecurityGroupConfig(
955             name=self.sec_grp_name, description='hello group')
956         self.security_groups.append(
957             neutron_utils.create_security_group(
958                 self.neutron, self.keystone, sec_grp_settings))
959
960         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
961         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
962
963         sec_grp_get = neutron_utils.get_security_group(
964             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
965         self.assertIsNotNone(sec_grp_get)
966         self.assertEqual(self.security_groups[0], sec_grp_get)
967
968     def test_create_sec_grp_one_rule(self):
969         """
970         Tests the neutron_utils.create_security_group() function
971         """
972
973         sec_grp_rule_settings = SecurityGroupRuleConfig(
974             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
975         sec_grp_settings = SecurityGroupConfig(
976             name=self.sec_grp_name, description='hello group',
977             rule_settings=[sec_grp_rule_settings])
978
979         self.security_groups.append(
980             neutron_utils.create_security_group(
981                 self.neutron, self.keystone, sec_grp_settings))
982         free_rules = neutron_utils.get_rules_by_security_group(
983             self.neutron, self.security_groups[0])
984         for free_rule in free_rules:
985             self.security_group_rules.append(free_rule)
986
987         keystone = keystone_utils.keystone_client(
988             self.os_creds, self.os_session)
989         self.security_group_rules.append(
990             neutron_utils.create_security_group_rule(
991                 self.neutron, keystone, sec_grp_settings.rule_settings[0],
992                 self.os_creds.project_name))
993
994         # Refresh object so it is populated with the newly added rule
995         security_group = neutron_utils.get_security_group(
996             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
997
998         rules = neutron_utils.get_rules_by_security_group(
999             self.neutron, security_group)
1000
1001         self.assertTrue(
1002             validation_utils.objects_equivalent(
1003                  self.security_group_rules, rules))
1004
1005         self.assertTrue(sec_grp_settings.name, security_group.name)
1006
1007         sec_grp_get = neutron_utils.get_security_group(
1008             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
1009         self.assertIsNotNone(sec_grp_get)
1010         self.assertEqual(security_group, sec_grp_get)
1011
1012     def test_get_sec_grp_by_id(self):
1013         """
1014         Tests the neutron_utils.create_security_group() function
1015         """
1016
1017         self.security_groups.append(neutron_utils.create_security_group(
1018             self.neutron, self.keystone,
1019             SecurityGroupConfig(
1020                 name=self.sec_grp_name + '-1', description='hello group')))
1021         self.security_groups.append(neutron_utils.create_security_group(
1022             self.neutron, self.keystone,
1023             SecurityGroupConfig(
1024                 name=self.sec_grp_name + '-2', description='hello group')))
1025
1026         sec_grp_1b = neutron_utils.get_security_group_by_id(
1027             self.neutron, self.security_groups[0].id)
1028         sec_grp_2b = neutron_utils.get_security_group_by_id(
1029             self.neutron, self.security_groups[1].id)
1030
1031         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
1032         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
1033
1034     def test_create_list_sec_grp_no_rules(self):
1035         """
1036         Tests the neutron_utils.create_security_group() and
1037         list_security_groups function
1038         """
1039         sec_grp_settings = SecurityGroupConfig(
1040             name=self.sec_grp_name + "-1", description='hello group')
1041         self.security_groups.append(neutron_utils.create_security_group(
1042             self.neutron, self.keystone, sec_grp_settings))
1043
1044         sec_grp_settings2 = SecurityGroupConfig(
1045             name=self.sec_grp_name + "-2", description='hola group')
1046         self.security_groups.append(neutron_utils.create_security_group(
1047             self.neutron, self.keystone, sec_grp_settings2))
1048
1049         returned_sec_groups = neutron_utils.list_security_groups(self.neutron)
1050
1051         self.assertIsNotNone(returned_sec_groups)
1052         worked = 0
1053         for sg in returned_sec_groups:
1054             if sec_grp_settings.name == sg.name:
1055                 worked += 1
1056             elif sec_grp_settings2.name == sg.name:
1057                 worked += 1
1058
1059         self.assertEqual(worked, 2)
1060
1061
1062 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
1063     """
1064     Test basic nova keypair functionality
1065     """
1066
1067     def setUp(self):
1068         """
1069         Instantiates the CreateImage object that is responsible for downloading
1070         and creating an OS image file within OpenStack
1071         """
1072         self.neutron = neutron_utils.neutron_client(
1073             self.os_creds, self.os_session)
1074         self.keystone = keystone_utils.keystone_client(
1075             self.os_creds, self.os_session)
1076         self.floating_ip = None
1077
1078     def tearDown(self):
1079         """
1080         Cleans the image and downloaded image file
1081         """
1082         if self.floating_ip:
1083             try:
1084                 neutron_utils.delete_floating_ip(
1085                     self.neutron, self.floating_ip)
1086             except:
1087                 pass
1088
1089         super(self.__class__, self).__clean__()
1090
1091     def test_floating_ips(self):
1092         """
1093         Tests the creation of a floating IP
1094         :return:
1095         """
1096         initial_fips = neutron_utils.get_floating_ips(self.neutron)
1097
1098         self.floating_ip = neutron_utils.create_floating_ip(
1099             self.neutron, self.keystone, self.ext_net_name)
1100         all_fips = neutron_utils.get_floating_ips(self.neutron)
1101         self.assertEqual(len(initial_fips) + 1, len(all_fips))
1102         returned = neutron_utils.get_floating_ip(self.neutron,
1103                                                  self.floating_ip)
1104         self.assertEqual(self.floating_ip.id, returned.id)
1105         self.assertEqual(self.floating_ip.ip, returned.ip)
1106
1107
1108 """
1109 Validation routines
1110 """
1111
1112
1113 def validate_network(neutron, keystone, name, exists, project_name, mtu=None):
1114     """
1115     Returns true if a network for a given name DOES NOT exist if the exists
1116     parameter is false conversely true. Returns false if a network for a given
1117     name DOES exist if the exists parameter is true conversely false.
1118     :param neutron: The neutron client
1119     :param keystone: The keystone client
1120     :param name: The expected network name
1121     :param exists: Whether or not the network name should exist or not
1122     :param project_name: the associated project name
1123     :return: True/False
1124     """
1125     network = neutron_utils.get_network(
1126         neutron, keystone, network_name=name, project_name=project_name)
1127     if exists and network:
1128         return True
1129     if not exists and not network:
1130         return True
1131     if mtu:
1132         return mtu == network.mtu
1133     return False
1134
1135
1136 def validate_subnet(neutron, network, name, cidr, exists):
1137     """
1138     Returns true if a subnet for a given name DOES NOT exist if the exists
1139     parameter is false conversely true. Returns false if a subnet for a given
1140     name DOES exist if the exists parameter is true conversely false.
1141     :param neutron: The neutron client
1142     :param network: The SNAPS-OO Network domain object
1143     :param name: The expected subnet name
1144     :param cidr: The expected CIDR value
1145     :param exists: Whether or not the network name should exist or not
1146     :return: True/False
1147     """
1148     subnet = neutron_utils.get_subnet(
1149         neutron, network, subnet_name=name)
1150     if exists and subnet and subnet.name == name:
1151         return subnet.cidr == cidr
1152     if not exists and not subnet:
1153         return True
1154     return False
1155
1156
1157 def validate_router(neutron, keystone, name, project_name, exists):
1158     """
1159     Returns true if a router for a given name DOES NOT exist if the exists
1160     parameter is false conversely true. Returns false if a router for a given
1161     name DOES exist if the exists parameter is true conversely false.
1162     :param neutron: The neutron client
1163     :param keystone: The keystone client
1164     :param name: The expected router name
1165     :param project_name: The name of the project in which the router should
1166                          exist
1167     :param exists: Whether or not the network name should exist or not
1168     :return: True/False
1169     """
1170     router = neutron_utils.get_router(
1171         neutron, keystone, router_name=name, project_name=project_name)
1172     if exists and router:
1173         return True
1174     return False
1175
1176
1177 def validate_interface_router(interface_router, router, subnet):
1178     """
1179     Returns true if the router ID & subnet ID have been properly included into
1180     the interface router object
1181     :param interface_router: the SNAPS-OO InterfaceRouter domain object
1182     :param router: to validate against the interface_router
1183     :param subnet: to validate against the interface_router
1184     :return: True if both IDs match else False
1185     """
1186     subnet_id = interface_router.subnet_id
1187     router_id = interface_router.port_id
1188
1189     return subnet.id == subnet_id and router.id == router_id
1190
1191
1192 def validate_port(neutron, port_obj, this_port_name):
1193     """
1194     Returns true if a port for a given name DOES NOT exist if the exists
1195     parameter is false conversely true. Returns false if a port for a given
1196     name DOES exist if the exists parameter is true conversely false.
1197     :param neutron: The neutron client
1198     :param port_obj: The port object to lookup
1199     :param this_port_name: The expected router name
1200     :return: True/False
1201     """
1202     os_ports = neutron.list_ports()
1203     for os_port, os_port_insts in os_ports.items():
1204         for os_inst in os_port_insts:
1205             if os_inst['id'] == port_obj.id:
1206                 return os_inst['name'] == this_port_name
1207     return False