d43b969a365e530e739facc833abb902f68e14e7
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from neutronclient.common.exceptions import NotFound, BadRequest
18
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.config.security_group import (
21     SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds, self.os_session)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(
92             self.os_creds, self.os_session)
93         self.keystone = keystone_utils.keystone_client(
94             self.os_creds, self.os_session)
95         self.network = None
96         self.net_config = openstack_tests.get_pub_net_config(
97             project_name=self.os_creds.project_name,
98             net_name=guid + '-pub-net')
99
100     def tearDown(self):
101         """
102         Cleans the remote OpenStack objects
103         """
104         if self.network:
105             neutron_utils.delete_network(self.neutron, self.network)
106
107         super(self.__class__, self).__clean__()
108
109     def test_create_network(self):
110         """
111         Tests the neutron_utils.create_network() function
112         """
113         self.network = neutron_utils.create_network(
114             self.neutron, self.os_creds, self.net_config.network_settings)
115         self.assertEqual(self.net_config.network_settings.name,
116                          self.network.name)
117         self.assertTrue(validate_network(
118             self.neutron, self.keystone,
119             self.net_config.network_settings.name, True,
120             self.os_creds.project_name))
121         self.assertEqual(len(self.net_config.network_settings.subnet_settings),
122                          len(self.network.subnets))
123
124     def test_create_network_empty_name(self):
125         """
126         Tests the neutron_utils.create_network() function with an empty
127         network name
128         """
129         with self.assertRaises(Exception):
130             self.network = neutron_utils.create_network(
131                 self.neutron, self.os_creds,
132                 network_settings=NetworkConfig(name=''))
133
134     def test_create_network_null_name(self):
135         """
136         Tests the neutron_utils.create_network() function when the network
137         name is None
138         """
139         with self.assertRaises(Exception):
140             self.network = neutron_utils.create_network(
141                 self.neutron, self.os_creds,
142                 network_settings=NetworkConfig())
143
144
145 class NeutronUtilsSubnetTests(OSComponentTestCase):
146     """
147     Test for creating networks with subnets via neutron_utils.py
148     """
149
150     def setUp(self):
151         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
152         self.port_name = str(guid) + '-port'
153         self.neutron = neutron_utils.neutron_client(
154             self.os_creds, self.os_session)
155         self.keystone = keystone_utils.keystone_client(
156             self.os_creds, self.os_session)
157         self.network = None
158         self.net_config = openstack_tests.get_pub_net_config(
159             project_name=self.os_creds.project_name,
160             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
161             external_net=self.ext_net_name)
162
163     def tearDown(self):
164         """
165         Cleans the remote OpenStack objects
166         """
167         if self.network:
168             try:
169                 neutron_utils.delete_network(self.neutron, self.network)
170             except:
171                 pass
172
173         super(self.__class__, self).__clean__()
174
175     def test_create_subnet(self):
176         """
177         Tests the neutron_utils.create_network() function
178         """
179         self.network = neutron_utils.create_network(
180             self.neutron, self.os_creds, self.net_config.network_settings)
181         self.assertEqual(self.net_config.network_settings.name,
182                          self.network.name)
183         self.assertTrue(validate_network(
184             self.neutron, self.keystone,
185             self.net_config.network_settings.name, True,
186             self.os_creds.project_name))
187
188         subnet_setting = self.net_config.network_settings.subnet_settings[0]
189         self.assertTrue(validate_subnet(
190             self.neutron, self.network, subnet_setting.name,
191             subnet_setting.cidr, True))
192
193         subnet_query1 = neutron_utils.get_subnet(
194             self.neutron, self.network, subnet_name=subnet_setting.name)
195         self.assertEqual(self.network.subnets[0], subnet_query1)
196
197         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
198                                                              self.network)
199         self.assertIsNotNone(subnet_query2)
200         self.assertEqual(1, len(subnet_query2))
201         self.assertEqual(self.network.subnets[0], subnet_query2[0])
202
203         subnet_query3 = neutron_utils.get_subnet_by_name(
204             self.neutron, self.keystone, subnet_setting.name,
205             self.os_creds.project_name)
206         self.assertIsNotNone(subnet_query3)
207         self.assertEqual(self.network.subnets[0], subnet_query3)
208
209     def test_create_subnet_null_name(self):
210         """
211         Tests the neutron_utils.create_neutron_subnet() function for an
212         Exception when the subnet name is None
213         """
214         self.network = neutron_utils.create_network(
215             self.neutron, self.os_creds, self.net_config.network_settings)
216         self.assertEqual(self.net_config.network_settings.name,
217                          self.network.name)
218         self.assertTrue(validate_network(
219             self.neutron, self.keystone,
220             self.net_config.network_settings.name, True,
221             self.os_creds.project_name))
222
223         with self.assertRaises(Exception):
224             SubnetConfig(cidr=self.net_config.subnet_cidr)
225
226     def test_create_subnet_empty_name(self):
227         """
228         Tests the neutron_utils.create_network() function with an empty
229         name
230         """
231         self.network = neutron_utils.create_network(
232             self.neutron, self.os_creds, self.net_config.network_settings)
233         self.assertEqual(self.net_config.network_settings.name,
234                          self.network.name)
235         self.assertTrue(validate_network(
236             self.neutron, self.keystone,
237             self.net_config.network_settings.name, True,
238             self.os_creds.project_name))
239
240         subnet_setting = self.net_config.network_settings.subnet_settings[0]
241         self.assertTrue(validate_subnet(
242             self.neutron, self.network, subnet_setting.name,
243             subnet_setting.cidr, True))
244         self.assertFalse(validate_subnet(
245             self.neutron, self.network, '', subnet_setting.cidr, True))
246
247         subnet_query1 = neutron_utils.get_subnet(
248             self.neutron, self.network, subnet_name=subnet_setting.name)
249         self.assertEqual(self.network.subnets[0], subnet_query1)
250
251         subnet_query2 = neutron_utils.get_subnets_by_network(
252             self.neutron, self.network)
253         self.assertIsNotNone(subnet_query2)
254         self.assertEqual(1, len(subnet_query2))
255         self.assertEqual(self.network.subnets[0], subnet_query2[0])
256
257     def test_create_subnet_null_cidr(self):
258         """
259         Tests the neutron_utils.create_neutron_subnet() function for an
260         Exception when the subnet CIDR value is None
261         """
262         self.net_config.network_settings.subnet_settings[0].cidr = None
263         with self.assertRaises(Exception):
264             self.network = neutron_utils.create_network(
265                 self.neutron, self.os_creds, self.net_config.network_settings)
266
267     def test_create_subnet_empty_cidr(self):
268         """
269         Tests the neutron_utils.create_neutron_subnet() function for an
270         Exception when the subnet CIDR value is empty
271         """
272         self.net_config.network_settings.subnet_settings[0].cidr = ''
273         with self.assertRaises(Exception):
274             self.network = neutron_utils.create_network(
275                 self.neutron, self.os_creds, self.net_config.network_settings)
276
277
278 class NeutronUtilsIPv6Tests(OSComponentTestCase):
279     """
280     Test for creating IPv6 networks with subnets via neutron_utils.py
281     """
282
283     def setUp(self):
284         self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
285         self.neutron = neutron_utils.neutron_client(
286             self.os_creds, self.os_session)
287         self.network = None
288
289     def tearDown(self):
290         """
291         Cleans the remote OpenStack objects
292         """
293         if self.network:
294             try:
295                 neutron_utils.delete_network(self.neutron, self.network)
296             except:
297                 pass
298
299         super(self.__class__, self).__clean__()
300
301     def test_create_network_slaac(self):
302         """
303         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
304         is True and IPv6 modes are slaac
305         """
306         sub_setting = SubnetConfig(
307             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
308             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
309             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
310             enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
311         self.network_settings = NetworkConfig(
312             name=self.guid + '-net', subnet_settings=[sub_setting])
313
314         self.network = neutron_utils.create_network(
315             self.neutron, self.os_creds, self.network_settings)
316         self.assertEqual(self.network_settings.name, self.network.name)
317
318         subnet_settings = self.network_settings.subnet_settings[0]
319         self.assertEqual(1, len(self.network.subnets))
320         subnet = self.network.subnets[0]
321
322         self.assertEqual(self.network.id, subnet.network_id)
323         self.assertEqual(subnet_settings.name, subnet.name)
324         self.assertEqual(subnet_settings.start, subnet.start)
325         self.assertEqual(subnet_settings.end, subnet.end)
326         self.assertEqual('1:1::/64', subnet.cidr)
327         self.assertEqual(6, subnet.ip_version)
328         self.assertEqual(1, len(subnet.dns_nameservers))
329         self.assertEqual(
330             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
331         self.assertTrue(subnet.enable_dhcp)
332         self.assertEqual(
333             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
334         self.assertEqual(
335             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
336
337     def test_create_network_stateful(self):
338         """
339         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
340         is True and IPv6 modes are stateful
341         """
342         sub_setting = SubnetConfig(
343             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
344             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
345             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
346             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
347             ipv6_address_mode='dhcpv6-stateful')
348         self.network_settings = NetworkConfig(
349             name=self.guid + '-net', subnet_settings=[sub_setting])
350
351         self.network = neutron_utils.create_network(
352             self.neutron, self.os_creds, self.network_settings)
353
354         self.assertEqual(self.network_settings.name, self.network.name)
355
356         subnet_settings = self.network_settings.subnet_settings[0]
357         self.assertEqual(1, len(self.network.subnets))
358         subnet = self.network.subnets[0]
359
360         self.assertEqual(self.network.id, subnet.network_id)
361         self.assertEqual(subnet_settings.name, subnet.name)
362         self.assertEqual(subnet_settings.start, subnet.start)
363         self.assertEqual(subnet_settings.end, subnet.end)
364         self.assertEqual('1:1::/64', subnet.cidr)
365         self.assertEqual(6, subnet.ip_version)
366         self.assertEqual(1, len(subnet.dns_nameservers))
367         self.assertEqual(
368             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
369         self.assertTrue(subnet.enable_dhcp)
370         self.assertEqual(
371             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
372         self.assertEqual(
373             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
374
375     def test_create_network_stateless(self):
376         """
377         Tests the neutron_utils.create_network() when DHCP is enabled and
378         the RA and address modes are both 'slaac'
379         """
380         sub_setting = SubnetConfig(
381             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
382             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
383             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
384             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
385             ipv6_address_mode='dhcpv6-stateless')
386         self.network_settings = NetworkConfig(
387             name=self.guid + '-net', subnet_settings=[sub_setting])
388
389         self.network = neutron_utils.create_network(
390             self.neutron, self.os_creds, self.network_settings)
391
392         self.assertEqual(self.network_settings.name, self.network.name)
393
394         subnet_settings = self.network_settings.subnet_settings[0]
395         self.assertEqual(1, len(self.network.subnets))
396         subnet = self.network.subnets[0]
397
398         self.assertEqual(self.network.id, subnet.network_id)
399         self.assertEqual(subnet_settings.name, subnet.name)
400         self.assertEqual(subnet_settings.start, subnet.start)
401         self.assertEqual(subnet_settings.end, subnet.end)
402         self.assertEqual('1:1::/64', subnet.cidr)
403         self.assertEqual(6, subnet.ip_version)
404         self.assertEqual(1, len(subnet.dns_nameservers))
405         self.assertEqual(
406             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
407         self.assertTrue(subnet.enable_dhcp)
408         self.assertEqual(
409             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
410         self.assertEqual(
411             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
412
413     def test_create_network_no_dhcp_slaac(self):
414         """
415         Tests the neutron_utils.create_network() for a BadRequest when
416         DHCP is not enabled and the RA and address modes are both 'slaac'
417         """
418         sub_setting = SubnetConfig(
419             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
420             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
421             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
422             enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
423         self.network_settings = NetworkConfig(
424             name=self.guid + '-net', subnet_settings=[sub_setting])
425
426         with self.assertRaises(BadRequest):
427             self.network = neutron_utils.create_network(
428                 self.neutron, self.os_creds, self.network_settings)
429
430     def test_create_network_invalid_start_ip(self):
431         """
432         Tests the neutron_utils.create_network() that contains one IPv6 subnet
433         with an invalid start IP to ensure Neutron assigns it the smallest IP
434         possible
435         """
436         sub_setting = SubnetConfig(
437             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
438             start='foo')
439         self.network_settings = NetworkConfig(
440             name=self.guid + '-net', subnet_settings=[sub_setting])
441
442         self.network = neutron_utils.create_network(
443             self.neutron, self.os_creds, self.network_settings)
444
445         self.assertEqual('1:1::2', self.network.subnets[0].start)
446         self.assertEqual(
447             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
448
449     def test_create_network_invalid_end_ip(self):
450         """
451         Tests the neutron_utils.create_network() that contains one IPv6 subnet
452         with an invalid end IP to ensure Neutron assigns it the largest IP
453         possible
454         """
455         sub_setting = SubnetConfig(
456             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
457             end='bar')
458         self.network_settings = NetworkConfig(
459             name=self.guid + '-net', subnet_settings=[sub_setting])
460
461         self.network = neutron_utils.create_network(
462             self.neutron, self.os_creds, self.network_settings)
463
464         self.assertEqual('1:1::2', self.network.subnets[0].start)
465         self.assertEqual(
466             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
467
468     def test_create_network_with_bad_cidr(self):
469         """
470         Tests the neutron_utils.create_network() for a BadRequest when
471         the subnet CIDR is invalid
472         """
473         sub_setting = SubnetConfig(
474             name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
475         self.network_settings = NetworkConfig(
476             name=self.guid + '-net', subnet_settings=[sub_setting])
477
478         with self.assertRaises(BadRequest):
479             self.network = neutron_utils.create_network(
480                 self.neutron, self.os_creds, self.network_settings)
481
482     def test_create_network_invalid_gateway_ip(self):
483         """
484         Tests the neutron_utils.create_network() for a BadRequest when
485         the subnet gateway IP is invalid
486         """
487         sub_setting = SubnetConfig(
488             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
489             gateway_ip='192.168.0.1')
490         self.network_settings = NetworkConfig(
491             name=self.guid + '-net', subnet_settings=[sub_setting])
492
493         with self.assertRaises(BadRequest):
494             self.network = neutron_utils.create_network(
495                 self.neutron, self.os_creds, self.network_settings)
496
497     def test_create_network_with_bad_dns(self):
498         """
499         Tests the neutron_utils.create_network() for a BadRequest when
500         the DNS IP is invalid
501         """
502         sub_setting = SubnetConfig(
503             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
504             dns_nameservers=['foo'])
505         self.network_settings = NetworkConfig(
506             name=self.guid + '-net', subnet_settings=[sub_setting])
507
508         with self.assertRaises(BadRequest):
509             self.network = neutron_utils.create_network(
510                     self.neutron, self.os_creds, self.network_settings)
511
512
513 class NeutronUtilsRouterTests(OSComponentTestCase):
514     """
515     Test for creating routers via neutron_utils.py
516     """
517
518     def setUp(self):
519         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
520         self.port_name = str(guid) + '-port'
521         self.neutron = neutron_utils.neutron_client(
522             self.os_creds, self.os_session)
523         self.keystone = keystone_utils.keystone_client(
524             self.os_creds, self.os_session)
525         self.network = None
526         self.port = None
527         self.router = None
528         self.interface_router = None
529         self.net_config = openstack_tests.get_pub_net_config(
530             project_name=self.os_creds.project_name,
531             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
532             router_name=guid + '-pub-router', external_net=self.ext_net_name)
533
534     def tearDown(self):
535         """
536         Cleans the remote OpenStack objects
537         """
538         if self.interface_router:
539             neutron_utils.remove_interface_router(
540                 self.neutron, self.router, self.network.subnets[0])
541
542         if self.router:
543             try:
544                 neutron_utils.delete_router(self.neutron, self.router)
545                 validate_router(
546                     self.neutron, self.keystone, self.router.name,
547                     self.os_creds.project_name, False)
548             except:
549                 pass
550
551         if self.port:
552             try:
553                 neutron_utils.delete_port(self.neutron, self.port)
554             except:
555                 pass
556
557         if self.network:
558             neutron_utils.delete_network(self.neutron, self.network)
559
560         super(self.__class__, self).__clean__()
561
562     def test_create_router_simple(self):
563         """
564         Tests the neutron_utils.create_router()
565         """
566         self.router = neutron_utils.create_router(
567             self.neutron, self.os_creds, self.net_config.router_settings)
568         validate_router(
569             self.neutron, self.keystone, self.net_config.router_settings.name,
570             self.os_creds.project_name, True)
571
572     def test_create_router_with_public_interface(self):
573         """
574         Tests the neutron_utils.create_router() function with a pubic interface
575         """
576         subnet_setting = self.net_config.network_settings.subnet_settings[0]
577         self.net_config = openstack_tests.OSNetworkConfig(
578             self.os_creds.project_name, self.net_config.network_settings.name,
579             subnet_setting.name, subnet_setting.cidr,
580             self.net_config.router_settings.name, self.ext_net_name)
581         self.router = neutron_utils.create_router(
582             self.neutron, self.os_creds, self.net_config.router_settings)
583         validate_router(
584             self.neutron, self.keystone, self.net_config.router_settings.name,
585             self.os_creds.project_name, True)
586
587         ext_net = neutron_utils.get_network(
588             self.neutron, self.keystone, network_name=self.ext_net_name)
589         self.assertEqual(self.router.external_network_id, ext_net.id)
590
591     def test_add_interface_router(self):
592         """
593         Tests the neutron_utils.add_interface_router() function
594         """
595         self.network = neutron_utils.create_network(
596             self.neutron, self.os_creds, self.net_config.network_settings)
597         self.assertEqual(self.net_config.network_settings.name,
598                          self.network.name)
599         self.assertTrue(validate_network(
600             self.neutron, self.keystone,
601             self.net_config.network_settings.name, True,
602             self.os_creds.project_name))
603
604         subnet_setting = self.net_config.network_settings.subnet_settings[0]
605         self.assertTrue(validate_subnet(
606             self.neutron, self.network, subnet_setting.name,
607             subnet_setting.cidr, True))
608
609         self.router = neutron_utils.create_router(
610             self.neutron, self.os_creds, self.net_config.router_settings)
611         validate_router(
612             self.neutron, self.keystone, self.net_config.router_settings.name,
613             self.os_creds.project_name, True)
614
615         self.interface_router = neutron_utils.add_interface_router(
616             self.neutron, self.router, self.network.subnets[0])
617         validate_interface_router(self.interface_router, self.router,
618                                   self.network.subnets[0])
619
620     def test_add_interface_router_null_router(self):
621         """
622         Tests the neutron_utils.add_interface_router() function for an
623         Exception when the router value is None
624         """
625         self.network = neutron_utils.create_network(
626             self.neutron, self.os_creds, self.net_config.network_settings)
627         self.assertEqual(self.net_config.network_settings.name,
628                          self.network.name)
629         self.assertTrue(validate_network(
630             self.neutron, self.keystone,
631             self.net_config.network_settings.name, True,
632             self.os_creds.project_name))
633
634         subnet_setting = self.net_config.network_settings.subnet_settings[0]
635         self.assertTrue(validate_subnet(
636             self.neutron, self.network, subnet_setting.name,
637             subnet_setting.cidr, True))
638
639         with self.assertRaises(NeutronException):
640             self.interface_router = neutron_utils.add_interface_router(
641                 self.neutron, self.router, self.network.subnets[0])
642
643     def test_add_interface_router_null_subnet(self):
644         """
645         Tests the neutron_utils.add_interface_router() function for an
646         Exception when the subnet value is None
647         """
648         self.network = neutron_utils.create_network(
649             self.neutron, self.os_creds, self.net_config.network_settings)
650         self.assertEqual(self.net_config.network_settings.name,
651                          self.network.name)
652         self.assertTrue(validate_network(
653             self.neutron, self.keystone,
654             self.net_config.network_settings.name, True,
655             self.os_creds.project_name))
656
657         self.router = neutron_utils.create_router(
658             self.neutron, self.os_creds, self.net_config.router_settings)
659         validate_router(
660             self.neutron, self.keystone, self.net_config.router_settings.name,
661             self.os_creds.project_name, True)
662
663         with self.assertRaises(NeutronException):
664             self.interface_router = neutron_utils.add_interface_router(
665                 self.neutron, self.router, None)
666
667     def test_add_interface_router_missing_subnet(self):
668         """
669         Tests the neutron_utils.add_interface_router() function for an
670         Exception when the subnet object has been deleted
671         """
672         self.network = neutron_utils.create_network(
673             self.neutron, self.os_creds, self.net_config.network_settings)
674         self.assertEqual(self.net_config.network_settings.name,
675                          self.network.name)
676         self.assertTrue(validate_network(
677             self.neutron, self.keystone,
678             self.net_config.network_settings.name, True,
679             self.os_creds.project_name))
680
681         self.router = neutron_utils.create_router(
682             self.neutron, self.os_creds, self.net_config.router_settings)
683         validate_router(
684             self.neutron, self.keystone, self.net_config.router_settings.name,
685             self.os_creds.project_name, True)
686
687         for subnet in self.network.subnets:
688             neutron_utils.delete_subnet(self.neutron, subnet)
689
690         with self.assertRaises(NotFound):
691             self.interface_router = neutron_utils.add_interface_router(
692                 self.neutron, self.router, self.network.subnets[0])
693
694     def test_create_port(self):
695         """
696         Tests the neutron_utils.create_port() function
697         """
698         self.network = neutron_utils.create_network(
699             self.neutron, self.os_creds, self.net_config.network_settings)
700         self.assertEqual(self.net_config.network_settings.name,
701                          self.network.name)
702         self.assertTrue(validate_network(
703             self.neutron, self.keystone,
704             self.net_config.network_settings.name, True,
705             self.os_creds.project_name))
706
707         subnet_setting = self.net_config.network_settings.subnet_settings[0]
708         self.assertTrue(validate_subnet(
709             self.neutron, self.network, subnet_setting.name,
710             subnet_setting.cidr, True))
711
712         self.port = neutron_utils.create_port(
713             self.neutron, self.os_creds, PortConfig(
714                 name=self.port_name,
715                 ip_addrs=[{
716                     'subnet_name': subnet_setting.name,
717                     'ip': ip_1}],
718                 network_name=self.net_config.network_settings.name))
719         validate_port(self.neutron, self.port, self.port_name)
720
721     def test_create_port_empty_name(self):
722         """
723         Tests the neutron_utils.create_port() function
724         """
725         self.network = neutron_utils.create_network(
726             self.neutron, self.os_creds, self.net_config.network_settings)
727         self.assertEqual(self.net_config.network_settings.name,
728                          self.network.name)
729         self.assertTrue(validate_network(
730             self.neutron, self.keystone,
731             self.net_config.network_settings.name, True,
732             self.os_creds.project_name))
733
734         subnet_setting = self.net_config.network_settings.subnet_settings[0]
735         self.assertTrue(validate_subnet(
736             self.neutron, self.network, subnet_setting.name,
737             subnet_setting.cidr, True))
738
739         self.port = neutron_utils.create_port(
740             self.neutron, self.os_creds, PortConfig(
741                 name=self.port_name,
742                 network_name=self.net_config.network_settings.name,
743                 ip_addrs=[{
744                     'subnet_name': subnet_setting.name,
745                     'ip': ip_1}]))
746         validate_port(self.neutron, self.port, self.port_name)
747
748     def test_create_port_null_name(self):
749         """
750         Tests the neutron_utils.create_port() when the port name value is None
751         """
752         self.network = neutron_utils.create_network(
753             self.neutron, self.os_creds, self.net_config.network_settings)
754         self.assertEqual(self.net_config.network_settings.name,
755                          self.network.name)
756         self.assertTrue(validate_network(
757             self.neutron, self.keystone,
758             self.net_config.network_settings.name, True,
759             self.os_creds.project_name))
760
761         subnet_setting = self.net_config.network_settings.subnet_settings[0]
762         self.assertTrue(validate_subnet(
763             self.neutron, self.network, subnet_setting.name,
764             subnet_setting.cidr, True))
765
766         self.port = neutron_utils.create_port(
767             self.neutron, self.os_creds,
768             PortConfig(
769                 network_name=self.net_config.network_settings.name,
770                 ip_addrs=[{
771                     'subnet_name': subnet_setting.name,
772                     'ip': ip_1}]))
773
774         port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
775         self.assertEqual(self.port, port)
776
777     def test_create_port_null_network_object(self):
778         """
779         Tests the neutron_utils.create_port() function for an Exception when
780         the network object is None
781         """
782         with self.assertRaises(Exception):
783             self.port = neutron_utils.create_port(
784                 self.neutron, self.os_creds,
785                 PortConfig(
786                     name=self.port_name,
787                     network_name=self.net_config.network_settings.name,
788                     ip_addrs=[{
789                         'subnet_name':
790                             self.net_config.network_settings.subnet_settings[
791                                 0].name,
792                         'ip': ip_1}]))
793
794     def test_create_port_null_ip(self):
795         """
796         Tests the neutron_utils.create_port() function for an Exception when
797         the IP value is None
798         """
799         self.network = neutron_utils.create_network(
800             self.neutron, self.os_creds, self.net_config.network_settings)
801         self.assertEqual(self.net_config.network_settings.name,
802                          self.network.name)
803         self.assertTrue(validate_network(
804             self.neutron, self.keystone,
805             self.net_config.network_settings.name, True,
806             self.os_creds.project_name))
807
808         subnet_setting = self.net_config.network_settings.subnet_settings[0]
809         self.assertTrue(validate_subnet(
810             self.neutron, self.network, subnet_setting.name,
811             subnet_setting.cidr, True))
812
813         with self.assertRaises(Exception):
814             self.port = neutron_utils.create_port(
815                 self.neutron, self.os_creds,
816                 PortConfig(
817                     name=self.port_name,
818                     network_name=self.net_config.network_settings.name,
819                     ip_addrs=[{
820                         'subnet_name': subnet_setting.name,
821                         'ip': None}]))
822
823     def test_create_port_invalid_ip(self):
824         """
825         Tests the neutron_utils.create_port() function for an Exception when
826         the IP value is None
827         """
828         self.network = neutron_utils.create_network(
829             self.neutron, self.os_creds, self.net_config.network_settings)
830         self.assertEqual(self.net_config.network_settings.name,
831                          self.network.name)
832         self.assertTrue(validate_network(
833             self.neutron, self.keystone,
834             self.net_config.network_settings.name, True,
835             self.os_creds.project_name))
836
837         subnet_setting = self.net_config.network_settings.subnet_settings[0]
838         self.assertTrue(validate_subnet(
839             self.neutron, self.network, subnet_setting.name,
840             subnet_setting.cidr, True))
841
842         with self.assertRaises(Exception):
843             self.port = neutron_utils.create_port(
844                 self.neutron, self.os_creds,
845                 PortConfig(
846                     name=self.port_name,
847                     network_name=self.net_config.network_settings.name,
848                     ip_addrs=[{
849                         'subnet_name': subnet_setting.name,
850                         'ip': 'foo'}]))
851
852     def test_create_port_invalid_ip_to_subnet(self):
853         """
854         Tests the neutron_utils.create_port() function for an Exception when
855         the IP value is None
856         """
857         self.network = neutron_utils.create_network(
858             self.neutron, self.os_creds, self.net_config.network_settings)
859         self.assertEqual(self.net_config.network_settings.name,
860                          self.network.name)
861         self.assertTrue(validate_network(
862             self.neutron, self.keystone,
863             self.net_config.network_settings.name, True,
864             self.os_creds.project_name))
865
866         subnet_setting = self.net_config.network_settings.subnet_settings[0]
867         self.assertTrue(validate_subnet(
868             self.neutron, self.network, subnet_setting.name,
869             subnet_setting.cidr, True))
870
871         with self.assertRaises(Exception):
872             self.port = neutron_utils.create_port(
873                 self.neutron, self.os_creds,
874                 PortConfig(
875                     name=self.port_name,
876                     network_name=self.net_config.network_settings.name,
877                     ip_addrs=[{
878                         'subnet_name': subnet_setting.name,
879                         'ip': '10.197.123.100'}]))
880
881
882 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
883     """
884     Test for creating security groups via neutron_utils.py
885     """
886
887     def setUp(self):
888         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
889         self.sec_grp_name = guid + 'name'
890
891         self.security_groups = list()
892         self.security_group_rules = list()
893         self.neutron = neutron_utils.neutron_client(
894             self.os_creds, self.os_session)
895         self.keystone = keystone_utils.keystone_client(
896             self.os_creds, self.os_session)
897
898     def tearDown(self):
899         """
900         Cleans the remote OpenStack objects
901         """
902         for rule in self.security_group_rules:
903             neutron_utils.delete_security_group_rule(self.neutron, rule)
904
905         for security_group in self.security_groups:
906             try:
907                 neutron_utils.delete_security_group(self.neutron,
908                                                     security_group)
909             except:
910                 pass
911
912         super(self.__class__, self).__clean__()
913
914     def test_create_delete_simple_sec_grp(self):
915         """
916         Tests the neutron_utils.create_security_group() function
917         """
918         sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
919         security_group = neutron_utils.create_security_group(
920             self.neutron, self.keystone, sec_grp_settings)
921
922         self.assertTrue(sec_grp_settings.name, security_group.name)
923
924         sec_grp_get = neutron_utils.get_security_group(
925             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
926         self.assertIsNotNone(sec_grp_get)
927         self.assertTrue(validation_utils.objects_equivalent(
928             security_group, sec_grp_get))
929
930         neutron_utils.delete_security_group(self.neutron, security_group)
931         sec_grp_get = neutron_utils.get_security_group(
932             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
933         self.assertIsNone(sec_grp_get)
934
935     def test_create_sec_grp_no_name(self):
936         """
937         Tests the SecurityGroupConfig constructor and
938         neutron_utils.create_security_group() function to ensure that
939         attempting to create a security group without a name will raise an
940         exception
941         """
942         with self.assertRaises(Exception):
943             sec_grp_settings = SecurityGroupConfig()
944             self.security_groups.append(
945                 neutron_utils.create_security_group(
946                     self.neutron, self.keystone, sec_grp_settings))
947
948     def test_create_sec_grp_no_rules(self):
949         """
950         Tests the neutron_utils.create_security_group() function
951         """
952         sec_grp_settings = SecurityGroupConfig(
953             name=self.sec_grp_name, description='hello group')
954         self.security_groups.append(
955             neutron_utils.create_security_group(
956                 self.neutron, self.keystone, sec_grp_settings))
957
958         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
959         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
960
961         sec_grp_get = neutron_utils.get_security_group(
962             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
963         self.assertIsNotNone(sec_grp_get)
964         self.assertEqual(self.security_groups[0], sec_grp_get)
965
966     def test_create_sec_grp_one_rule(self):
967         """
968         Tests the neutron_utils.create_security_group() function
969         """
970
971         sec_grp_rule_settings = SecurityGroupRuleConfig(
972             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
973         sec_grp_settings = SecurityGroupConfig(
974             name=self.sec_grp_name, description='hello group',
975             rule_settings=[sec_grp_rule_settings])
976
977         self.security_groups.append(
978             neutron_utils.create_security_group(
979                 self.neutron, self.keystone, sec_grp_settings))
980         free_rules = neutron_utils.get_rules_by_security_group(
981             self.neutron, self.security_groups[0])
982         for free_rule in free_rules:
983             self.security_group_rules.append(free_rule)
984
985         keystone = keystone_utils.keystone_client(
986             self.os_creds, self.os_session)
987         self.security_group_rules.append(
988             neutron_utils.create_security_group_rule(
989                 self.neutron, keystone, sec_grp_settings.rule_settings[0],
990                 self.os_creds.project_name))
991
992         # Refresh object so it is populated with the newly added rule
993         security_group = neutron_utils.get_security_group(
994             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
995
996         rules = neutron_utils.get_rules_by_security_group(
997             self.neutron, security_group)
998
999         self.assertTrue(
1000             validation_utils.objects_equivalent(
1001                  self.security_group_rules, rules))
1002
1003         self.assertTrue(sec_grp_settings.name, security_group.name)
1004
1005         sec_grp_get = neutron_utils.get_security_group(
1006             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
1007         self.assertIsNotNone(sec_grp_get)
1008         self.assertEqual(security_group, sec_grp_get)
1009
1010     def test_get_sec_grp_by_id(self):
1011         """
1012         Tests the neutron_utils.create_security_group() function
1013         """
1014
1015         self.security_groups.append(neutron_utils.create_security_group(
1016             self.neutron, self.keystone,
1017             SecurityGroupConfig(
1018                 name=self.sec_grp_name + '-1', description='hello group')))
1019         self.security_groups.append(neutron_utils.create_security_group(
1020             self.neutron, self.keystone,
1021             SecurityGroupConfig(
1022                 name=self.sec_grp_name + '-2', description='hello group')))
1023
1024         sec_grp_1b = neutron_utils.get_security_group_by_id(
1025             self.neutron, self.security_groups[0].id)
1026         sec_grp_2b = neutron_utils.get_security_group_by_id(
1027             self.neutron, self.security_groups[1].id)
1028
1029         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
1030         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
1031
1032     def test_create_list_sec_grp_no_rules(self):
1033         """
1034         Tests the neutron_utils.create_security_group() and
1035         list_security_groups function
1036         """
1037         sec_grp_settings = SecurityGroupConfig(
1038             name=self.sec_grp_name + "-1", description='hello group')
1039         self.security_groups.append(neutron_utils.create_security_group(
1040             self.neutron, self.keystone, sec_grp_settings))
1041
1042         sec_grp_settings2 = SecurityGroupConfig(
1043             name=self.sec_grp_name + "-2", description='hola group')
1044         self.security_groups.append(neutron_utils.create_security_group(
1045             self.neutron, self.keystone, sec_grp_settings2))
1046
1047         returned_sec_groups = neutron_utils.list_security_groups(self.neutron)
1048
1049         self.assertIsNotNone(returned_sec_groups)
1050         worked = 0
1051         for sg in returned_sec_groups:
1052             if sec_grp_settings.name == sg.name:
1053                 worked += 1
1054             elif sec_grp_settings2.name == sg.name:
1055                 worked += 1
1056
1057         self.assertEqual(worked, 2)
1058
1059
1060 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
1061     """
1062     Test basic nova keypair functionality
1063     """
1064
1065     def setUp(self):
1066         """
1067         Instantiates the CreateImage object that is responsible for downloading
1068         and creating an OS image file within OpenStack
1069         """
1070         self.neutron = neutron_utils.neutron_client(
1071             self.os_creds, self.os_session)
1072         self.keystone = keystone_utils.keystone_client(
1073             self.os_creds, self.os_session)
1074         self.floating_ip = None
1075
1076     def tearDown(self):
1077         """
1078         Cleans the image and downloaded image file
1079         """
1080         if self.floating_ip:
1081             try:
1082                 neutron_utils.delete_floating_ip(
1083                     self.neutron, self.floating_ip)
1084             except:
1085                 pass
1086
1087         super(self.__class__, self).__clean__()
1088
1089     def test_floating_ips(self):
1090         """
1091         Tests the creation of a floating IP
1092         :return:
1093         """
1094         initial_fips = neutron_utils.get_floating_ips(self.neutron)
1095
1096         self.floating_ip = neutron_utils.create_floating_ip(
1097             self.neutron, self.keystone, self.ext_net_name)
1098         all_fips = neutron_utils.get_floating_ips(self.neutron)
1099         self.assertEqual(len(initial_fips) + 1, len(all_fips))
1100         returned = neutron_utils.get_floating_ip(self.neutron,
1101                                                  self.floating_ip)
1102         self.assertEqual(self.floating_ip.id, returned.id)
1103         self.assertEqual(self.floating_ip.ip, returned.ip)
1104
1105
1106 """
1107 Validation routines
1108 """
1109
1110
1111 def validate_network(neutron, keystone, name, exists, project_name):
1112     """
1113     Returns true if a network for a given name DOES NOT exist if the exists
1114     parameter is false conversely true. Returns false if a network for a given
1115     name DOES exist if the exists parameter is true conversely false.
1116     :param neutron: The neutron client
1117     :param keystone: The keystone client
1118     :param name: The expected network name
1119     :param exists: Whether or not the network name should exist or not
1120     :param project_name: the associated project name
1121     :return: True/False
1122     """
1123     network = neutron_utils.get_network(
1124         neutron, keystone, network_name=name, project_name=project_name)
1125     if exists and network:
1126         return True
1127     if not exists and not network:
1128         return True
1129     return False
1130
1131
1132 def validate_subnet(neutron, network, name, cidr, exists):
1133     """
1134     Returns true if a subnet for a given name DOES NOT exist if the exists
1135     parameter is false conversely true. Returns false if a subnet for a given
1136     name DOES exist if the exists parameter is true conversely false.
1137     :param neutron: The neutron client
1138     :param network: The SNAPS-OO Network domain object
1139     :param name: The expected subnet name
1140     :param cidr: The expected CIDR value
1141     :param exists: Whether or not the network name should exist or not
1142     :return: True/False
1143     """
1144     subnet = neutron_utils.get_subnet(
1145         neutron, network, subnet_name=name)
1146     if exists and subnet and subnet.name == name:
1147         return subnet.cidr == cidr
1148     if not exists and not subnet:
1149         return True
1150     return False
1151
1152
1153 def validate_router(neutron, keystone, name, project_name, exists):
1154     """
1155     Returns true if a router for a given name DOES NOT exist if the exists
1156     parameter is false conversely true. Returns false if a router for a given
1157     name DOES exist if the exists parameter is true conversely false.
1158     :param neutron: The neutron client
1159     :param keystone: The keystone client
1160     :param name: The expected router name
1161     :param project_name: The name of the project in which the router should
1162                          exist
1163     :param exists: Whether or not the network name should exist or not
1164     :return: True/False
1165     """
1166     router = neutron_utils.get_router(
1167         neutron, keystone, router_name=name, project_name=project_name)
1168     if exists and router:
1169         return True
1170     return False
1171
1172
1173 def validate_interface_router(interface_router, router, subnet):
1174     """
1175     Returns true if the router ID & subnet ID have been properly included into
1176     the interface router object
1177     :param interface_router: the SNAPS-OO InterfaceRouter domain object
1178     :param router: to validate against the interface_router
1179     :param subnet: to validate against the interface_router
1180     :return: True if both IDs match else False
1181     """
1182     subnet_id = interface_router.subnet_id
1183     router_id = interface_router.port_id
1184
1185     return subnet.id == subnet_id and router.id == router_id
1186
1187
1188 def validate_port(neutron, port_obj, this_port_name):
1189     """
1190     Returns true if a port for a given name DOES NOT exist if the exists
1191     parameter is false conversely true. Returns false if a port for a given
1192     name DOES exist if the exists parameter is true conversely false.
1193     :param neutron: The neutron client
1194     :param port_obj: The port object to lookup
1195     :param this_port_name: The expected router name
1196     :return: True/False
1197     """
1198     os_ports = neutron.list_ports()
1199     for os_port, os_port_insts in os_ports.items():
1200         for os_inst in os_port_insts:
1201             if os_inst['id'] == port_obj.id:
1202                 return os_inst['name'] == this_port_name
1203     return False