Ensure project for routers are handled properly.
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from neutronclient.common.exceptions import NotFound, BadRequest
18
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.config.security_group import (
21     SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(self.os_creds)
92         self.keystone = keystone_utils.keystone_client(self.os_creds)
93         self.network = None
94         self.net_config = openstack_tests.get_pub_net_config(
95             net_name=guid + '-pub-net')
96
97     def tearDown(self):
98         """
99         Cleans the remote OpenStack objects
100         """
101         if self.network:
102             neutron_utils.delete_network(self.neutron, self.network)
103
104     def test_create_network(self):
105         """
106         Tests the neutron_utils.create_network() function
107         """
108         self.network = neutron_utils.create_network(
109             self.neutron, self.os_creds, self.net_config.network_settings)
110         self.assertEqual(self.net_config.network_settings.name,
111                          self.network.name)
112         self.assertTrue(validate_network(
113             self.neutron, self.keystone,
114             self.net_config.network_settings.name, True,
115             self.os_creds.project_name))
116         self.assertEqual(len(self.net_config.network_settings.subnet_settings),
117                          len(self.network.subnets))
118
119     def test_create_network_empty_name(self):
120         """
121         Tests the neutron_utils.create_network() function with an empty
122         network name
123         """
124         with self.assertRaises(Exception):
125             self.network = neutron_utils.create_network(
126                 self.neutron, self.os_creds,
127                 network_settings=NetworkConfig(name=''))
128
129     def test_create_network_null_name(self):
130         """
131         Tests the neutron_utils.create_network() function when the network
132         name is None
133         """
134         with self.assertRaises(Exception):
135             self.network = neutron_utils.create_network(
136                 self.neutron, self.os_creds,
137                 network_settings=NetworkConfig())
138
139
140 class NeutronUtilsSubnetTests(OSComponentTestCase):
141     """
142     Test for creating networks with subnets via neutron_utils.py
143     """
144
145     def setUp(self):
146         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
147         self.port_name = str(guid) + '-port'
148         self.neutron = neutron_utils.neutron_client(self.os_creds)
149         self.keystone = keystone_utils.keystone_client(self.os_creds)
150         self.network = None
151         self.net_config = openstack_tests.get_pub_net_config(
152             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
153             external_net=self.ext_net_name)
154
155     def tearDown(self):
156         """
157         Cleans the remote OpenStack objects
158         """
159         if self.network:
160             try:
161                 neutron_utils.delete_network(self.neutron, self.network)
162             except:
163                 pass
164
165     def test_create_subnet(self):
166         """
167         Tests the neutron_utils.create_network() function
168         """
169         self.network = neutron_utils.create_network(
170             self.neutron, self.os_creds, self.net_config.network_settings)
171         self.assertEqual(self.net_config.network_settings.name,
172                          self.network.name)
173         self.assertTrue(validate_network(
174             self.neutron, self.keystone,
175             self.net_config.network_settings.name, True,
176             self.os_creds.project_name))
177
178         subnet_setting = self.net_config.network_settings.subnet_settings[0]
179         self.assertTrue(validate_subnet(
180             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
181
182         subnet_query1 = neutron_utils.get_subnet(
183             self.neutron, subnet_name=subnet_setting.name)
184         self.assertEqual(self.network.subnets[0], subnet_query1)
185
186         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
187                                                              self.network)
188         self.assertIsNotNone(subnet_query2)
189         self.assertEqual(1, len(subnet_query2))
190         self.assertEqual(self.network.subnets[0], subnet_query2[0])
191
192     def test_create_subnet_null_name(self):
193         """
194         Tests the neutron_utils.create_neutron_subnet() function for an
195         Exception when the subnet name is None
196         """
197         self.network = neutron_utils.create_network(
198             self.neutron, self.os_creds, self.net_config.network_settings)
199         self.assertEqual(self.net_config.network_settings.name,
200                          self.network.name)
201         self.assertTrue(validate_network(
202             self.neutron, self.keystone,
203             self.net_config.network_settings.name, True,
204             self.os_creds.project_name))
205
206         with self.assertRaises(Exception):
207             SubnetConfig(cidr=self.net_config.subnet_cidr)
208
209     def test_create_subnet_empty_name(self):
210         """
211         Tests the neutron_utils.create_network() function with an empty
212         name
213         """
214         self.network = neutron_utils.create_network(
215             self.neutron, self.os_creds, self.net_config.network_settings)
216         self.assertEqual(self.net_config.network_settings.name,
217                          self.network.name)
218         self.assertTrue(validate_network(
219             self.neutron, self.keystone,
220             self.net_config.network_settings.name, True,
221             self.os_creds.project_name))
222
223         subnet_setting = self.net_config.network_settings.subnet_settings[0]
224         self.assertTrue(validate_subnet(
225             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
226         self.assertFalse(validate_subnet(
227             self.neutron, '', subnet_setting.cidr, True))
228
229         subnet_query1 = neutron_utils.get_subnet(
230             self.neutron, subnet_name=subnet_setting.name)
231         self.assertEqual(self.network.subnets[0], subnet_query1)
232
233         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
234                                                              self.network)
235         self.assertIsNotNone(subnet_query2)
236         self.assertEqual(1, len(subnet_query2))
237         self.assertEqual(self.network.subnets[0], subnet_query2[0])
238
239     def test_create_subnet_null_cidr(self):
240         """
241         Tests the neutron_utils.create_neutron_subnet() function for an
242         Exception when the subnet CIDR value is None
243         """
244         self.net_config.network_settings.subnet_settings[0].cidr = None
245         with self.assertRaises(Exception):
246             self.network = neutron_utils.create_network(
247                 self.neutron, self.os_creds, self.net_config.network_settings)
248
249     def test_create_subnet_empty_cidr(self):
250         """
251         Tests the neutron_utils.create_neutron_subnet() function for an
252         Exception when the subnet CIDR value is empty
253         """
254         self.net_config.network_settings.subnet_settings[0].cidr = ''
255         with self.assertRaises(Exception):
256             self.network = neutron_utils.create_network(
257                 self.neutron, self.os_creds, self.net_config.network_settings)
258
259
260 class NeutronUtilsIPv6Tests(OSComponentTestCase):
261     """
262     Test for creating IPv6 networks with subnets via neutron_utils.py
263     """
264
265     def setUp(self):
266         self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
267         self.neutron = neutron_utils.neutron_client(self.os_creds)
268         self.network = None
269
270     def tearDown(self):
271         """
272         Cleans the remote OpenStack objects
273         """
274         if self.network:
275             try:
276                 neutron_utils.delete_network(self.neutron, self.network)
277             except:
278                 pass
279
280     def test_create_network_slaac(self):
281         """
282         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
283         is True and IPv6 modes are slaac
284         """
285         sub_setting = SubnetConfig(
286             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
287             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
288             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
289             enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
290         self.network_settings = NetworkConfig(
291             name=self.guid + '-net', subnet_settings=[sub_setting])
292
293         self.network = neutron_utils.create_network(
294             self.neutron, self.os_creds, self.network_settings)
295         self.assertEqual(self.network_settings.name, self.network.name)
296
297         subnet_settings = self.network_settings.subnet_settings[0]
298         self.assertEqual(1, len(self.network.subnets))
299         subnet = self.network.subnets[0]
300
301         self.assertEqual(self.network.id, subnet.network_id)
302         self.assertEqual(subnet_settings.name, subnet.name)
303         self.assertEqual(subnet_settings.start, subnet.start)
304         self.assertEqual(subnet_settings.end, subnet.end)
305         self.assertEqual('1:1::/64', subnet.cidr)
306         self.assertEqual(6, subnet.ip_version)
307         self.assertEqual(1, len(subnet.dns_nameservers))
308         self.assertEqual(
309             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
310         self.assertTrue(subnet.enable_dhcp)
311         self.assertEqual(
312             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
313         self.assertEqual(
314             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
315
316     def test_create_network_stateful(self):
317         """
318         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
319         is True and IPv6 modes are stateful
320         """
321         sub_setting = SubnetConfig(
322             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
323             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
324             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
325             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
326             ipv6_address_mode='dhcpv6-stateful')
327         self.network_settings = NetworkConfig(
328             name=self.guid + '-net', subnet_settings=[sub_setting])
329
330         self.network = neutron_utils.create_network(
331             self.neutron, self.os_creds, self.network_settings)
332
333         self.assertEqual(self.network_settings.name, self.network.name)
334
335         subnet_settings = self.network_settings.subnet_settings[0]
336         self.assertEqual(1, len(self.network.subnets))
337         subnet = self.network.subnets[0]
338
339         self.assertEqual(self.network.id, subnet.network_id)
340         self.assertEqual(subnet_settings.name, subnet.name)
341         self.assertEqual(subnet_settings.start, subnet.start)
342         self.assertEqual(subnet_settings.end, subnet.end)
343         self.assertEqual('1:1::/64', subnet.cidr)
344         self.assertEqual(6, subnet.ip_version)
345         self.assertEqual(1, len(subnet.dns_nameservers))
346         self.assertEqual(
347             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
348         self.assertTrue(subnet.enable_dhcp)
349         self.assertEqual(
350             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
351         self.assertEqual(
352             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
353
354     def test_create_network_stateless(self):
355         """
356         Tests the neutron_utils.create_network() when DHCP is enabled and
357         the RA and address modes are both 'slaac'
358         """
359         sub_setting = SubnetConfig(
360             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
361             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
362             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
363             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
364             ipv6_address_mode='dhcpv6-stateless')
365         self.network_settings = NetworkConfig(
366             name=self.guid + '-net', subnet_settings=[sub_setting])
367
368         self.network = neutron_utils.create_network(
369             self.neutron, self.os_creds, self.network_settings)
370
371         self.assertEqual(self.network_settings.name, self.network.name)
372
373         subnet_settings = self.network_settings.subnet_settings[0]
374         self.assertEqual(1, len(self.network.subnets))
375         subnet = self.network.subnets[0]
376
377         self.assertEqual(self.network.id, subnet.network_id)
378         self.assertEqual(subnet_settings.name, subnet.name)
379         self.assertEqual(subnet_settings.start, subnet.start)
380         self.assertEqual(subnet_settings.end, subnet.end)
381         self.assertEqual('1:1::/64', subnet.cidr)
382         self.assertEqual(6, subnet.ip_version)
383         self.assertEqual(1, len(subnet.dns_nameservers))
384         self.assertEqual(
385             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
386         self.assertTrue(subnet.enable_dhcp)
387         self.assertEqual(
388             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
389         self.assertEqual(
390             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
391
392     def test_create_network_no_dhcp_slaac(self):
393         """
394         Tests the neutron_utils.create_network() for a BadRequest when
395         DHCP is not enabled and the RA and address modes are both 'slaac'
396         """
397         sub_setting = SubnetConfig(
398             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
399             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
400             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
401             enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
402         self.network_settings = NetworkConfig(
403             name=self.guid + '-net', subnet_settings=[sub_setting])
404
405         with self.assertRaises(BadRequest):
406             self.network = neutron_utils.create_network(
407                 self.neutron, self.os_creds, self.network_settings)
408
409     def test_create_network_invalid_start_ip(self):
410         """
411         Tests the neutron_utils.create_network() that contains one IPv6 subnet
412         with an invalid start IP to ensure Neutron assigns it the smallest IP
413         possible
414         """
415         sub_setting = SubnetConfig(
416             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
417             start='foo')
418         self.network_settings = NetworkConfig(
419             name=self.guid + '-net', subnet_settings=[sub_setting])
420
421         self.network = neutron_utils.create_network(
422             self.neutron, self.os_creds, self.network_settings)
423
424         self.assertEqual('1:1::2', self.network.subnets[0].start)
425         self.assertEqual(
426             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
427
428     def test_create_network_invalid_end_ip(self):
429         """
430         Tests the neutron_utils.create_network() that contains one IPv6 subnet
431         with an invalid end IP to ensure Neutron assigns it the largest IP
432         possible
433         """
434         sub_setting = SubnetConfig(
435             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
436             end='bar')
437         self.network_settings = NetworkConfig(
438             name=self.guid + '-net', subnet_settings=[sub_setting])
439
440         self.network = neutron_utils.create_network(
441             self.neutron, self.os_creds, self.network_settings)
442
443         self.assertEqual('1:1::2', self.network.subnets[0].start)
444         self.assertEqual(
445             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
446
447     def test_create_network_with_bad_cidr(self):
448         """
449         Tests the neutron_utils.create_network() for a BadRequest when
450         the subnet CIDR is invalid
451         """
452         sub_setting = SubnetConfig(
453             name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
454         self.network_settings = NetworkConfig(
455             name=self.guid + '-net', subnet_settings=[sub_setting])
456
457         with self.assertRaises(BadRequest):
458             self.network = neutron_utils.create_network(
459                 self.neutron, self.os_creds, self.network_settings)
460
461     def test_create_network_invalid_gateway_ip(self):
462         """
463         Tests the neutron_utils.create_network() for a BadRequest when
464         the subnet gateway IP is invalid
465         """
466         sub_setting = SubnetConfig(
467             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
468             gateway_ip='192.168.0.1')
469         self.network_settings = NetworkConfig(
470             name=self.guid + '-net', subnet_settings=[sub_setting])
471
472         with self.assertRaises(BadRequest):
473             self.network = neutron_utils.create_network(
474                 self.neutron, self.os_creds, self.network_settings)
475
476     def test_create_network_with_bad_dns(self):
477         """
478         Tests the neutron_utils.create_network() for a BadRequest when
479         the DNS IP is invalid
480         """
481         sub_setting = SubnetConfig(
482             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
483             dns_nameservers=['foo'])
484         self.network_settings = NetworkConfig(
485             name=self.guid + '-net', subnet_settings=[sub_setting])
486
487         with self.assertRaises(BadRequest):
488             self.network = neutron_utils.create_network(
489                     self.neutron, self.os_creds, self.network_settings)
490
491
492 class NeutronUtilsRouterTests(OSComponentTestCase):
493     """
494     Test for creating routers via neutron_utils.py
495     """
496
497     def setUp(self):
498         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
499         self.port_name = str(guid) + '-port'
500         self.neutron = neutron_utils.neutron_client(self.os_creds)
501         self.keystone = keystone_utils.keystone_client(self.os_creds)
502         self.network = None
503         self.port = None
504         self.router = None
505         self.interface_router = None
506         self.net_config = openstack_tests.get_pub_net_config(
507             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
508             router_name=guid + '-pub-router', external_net=self.ext_net_name)
509
510     def tearDown(self):
511         """
512         Cleans the remote OpenStack objects
513         """
514         if self.interface_router:
515             neutron_utils.remove_interface_router(
516                 self.neutron, self.router, self.network.subnets[0])
517
518         if self.router:
519             try:
520                 neutron_utils.delete_router(self.neutron, self.router)
521                 validate_router(
522                     self.neutron, self.keystone, self.router.name,
523                     self.os_creds.project_name, False)
524             except:
525                 pass
526
527         if self.port:
528             try:
529                 neutron_utils.delete_port(self.neutron, self.port)
530             except:
531                 pass
532
533         if self.network:
534             neutron_utils.delete_network(self.neutron, self.network)
535
536     def test_create_router_simple(self):
537         """
538         Tests the neutron_utils.create_router()
539         """
540         self.router = neutron_utils.create_router(
541             self.neutron, self.os_creds, self.net_config.router_settings)
542         validate_router(
543             self.neutron, self.keystone, self.net_config.router_settings.name,
544             self.os_creds.project_name, True)
545
546     def test_create_router_with_public_interface(self):
547         """
548         Tests the neutron_utils.create_router() function with a pubic interface
549         """
550         subnet_setting = self.net_config.network_settings.subnet_settings[0]
551         self.net_config = openstack_tests.OSNetworkConfig(
552             self.net_config.network_settings.name, subnet_setting.name,
553             subnet_setting.cidr, self.net_config.router_settings.name,
554             self.ext_net_name)
555         self.router = neutron_utils.create_router(
556             self.neutron, self.os_creds, self.net_config.router_settings)
557         validate_router(
558             self.neutron, self.keystone, self.net_config.router_settings.name,
559             self.os_creds.project_name, True)
560
561         ext_net = neutron_utils.get_network(
562             self.neutron, self.keystone, network_name=self.ext_net_name)
563         self.assertEqual(self.router.external_network_id, ext_net.id)
564
565     def test_add_interface_router(self):
566         """
567         Tests the neutron_utils.add_interface_router() function
568         """
569         self.network = neutron_utils.create_network(
570             self.neutron, self.os_creds, self.net_config.network_settings)
571         self.assertEqual(self.net_config.network_settings.name,
572                          self.network.name)
573         self.assertTrue(validate_network(
574             self.neutron, self.keystone,
575             self.net_config.network_settings.name, True,
576             self.os_creds.project_name))
577
578         subnet_setting = self.net_config.network_settings.subnet_settings[0]
579         self.assertTrue(validate_subnet(
580             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
581
582         self.router = neutron_utils.create_router(
583             self.neutron, self.os_creds, self.net_config.router_settings)
584         validate_router(
585             self.neutron, self.keystone, self.net_config.router_settings.name,
586             self.os_creds.project_name, True)
587
588         self.interface_router = neutron_utils.add_interface_router(
589             self.neutron, self.router, self.network.subnets[0])
590         validate_interface_router(self.interface_router, self.router,
591                                   self.network.subnets[0])
592
593     def test_add_interface_router_null_router(self):
594         """
595         Tests the neutron_utils.add_interface_router() function for an
596         Exception when the router value is None
597         """
598         self.network = neutron_utils.create_network(
599             self.neutron, self.os_creds, self.net_config.network_settings)
600         self.assertEqual(self.net_config.network_settings.name,
601                          self.network.name)
602         self.assertTrue(validate_network(
603             self.neutron, self.keystone,
604             self.net_config.network_settings.name, True,
605             self.os_creds.project_name))
606
607         subnet_setting = self.net_config.network_settings.subnet_settings[0]
608         self.assertTrue(validate_subnet(
609             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
610
611         with self.assertRaises(NeutronException):
612             self.interface_router = neutron_utils.add_interface_router(
613                 self.neutron, self.router, self.network.subnets[0])
614
615     def test_add_interface_router_null_subnet(self):
616         """
617         Tests the neutron_utils.add_interface_router() function for an
618         Exception when the subnet value is None
619         """
620         self.network = neutron_utils.create_network(
621             self.neutron, self.os_creds, self.net_config.network_settings)
622         self.assertEqual(self.net_config.network_settings.name,
623                          self.network.name)
624         self.assertTrue(validate_network(
625             self.neutron, self.keystone,
626             self.net_config.network_settings.name, True,
627             self.os_creds.project_name))
628
629         self.router = neutron_utils.create_router(
630             self.neutron, self.os_creds, self.net_config.router_settings)
631         validate_router(
632             self.neutron, self.keystone, self.net_config.router_settings.name,
633             self.os_creds.project_name, True)
634
635         with self.assertRaises(NeutronException):
636             self.interface_router = neutron_utils.add_interface_router(
637                 self.neutron, self.router, None)
638
639     def test_add_interface_router_missing_subnet(self):
640         """
641         Tests the neutron_utils.add_interface_router() function for an
642         Exception when the subnet object has been deleted
643         """
644         self.network = neutron_utils.create_network(
645             self.neutron, self.os_creds, self.net_config.network_settings)
646         self.assertEqual(self.net_config.network_settings.name,
647                          self.network.name)
648         self.assertTrue(validate_network(
649             self.neutron, self.keystone,
650             self.net_config.network_settings.name, True,
651             self.os_creds.project_name))
652
653         self.router = neutron_utils.create_router(
654             self.neutron, self.os_creds, self.net_config.router_settings)
655         validate_router(
656             self.neutron, self.keystone, self.net_config.router_settings.name,
657             self.os_creds.project_name, True)
658
659         for subnet in self.network.subnets:
660             neutron_utils.delete_subnet(self.neutron, subnet)
661
662         with self.assertRaises(NotFound):
663             self.interface_router = neutron_utils.add_interface_router(
664                 self.neutron, self.router, self.network.subnets[0])
665
666     def test_create_port(self):
667         """
668         Tests the neutron_utils.create_port() function
669         """
670         self.network = neutron_utils.create_network(
671             self.neutron, self.os_creds, self.net_config.network_settings)
672         self.assertEqual(self.net_config.network_settings.name,
673                          self.network.name)
674         self.assertTrue(validate_network(
675             self.neutron, self.keystone,
676             self.net_config.network_settings.name, True,
677             self.os_creds.project_name))
678
679         subnet_setting = self.net_config.network_settings.subnet_settings[0]
680         self.assertTrue(validate_subnet(
681             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
682
683         self.port = neutron_utils.create_port(
684             self.neutron, self.os_creds, PortConfig(
685                 name=self.port_name,
686                 ip_addrs=[{
687                     'subnet_name': subnet_setting.name,
688                     'ip': ip_1}],
689                 network_name=self.net_config.network_settings.name))
690         validate_port(self.neutron, self.port, self.port_name)
691
692     def test_create_port_empty_name(self):
693         """
694         Tests the neutron_utils.create_port() function
695         """
696         self.network = neutron_utils.create_network(
697             self.neutron, self.os_creds, self.net_config.network_settings)
698         self.assertEqual(self.net_config.network_settings.name,
699                          self.network.name)
700         self.assertTrue(validate_network(
701             self.neutron, self.keystone,
702             self.net_config.network_settings.name, True,
703             self.os_creds.project_name))
704
705         subnet_setting = self.net_config.network_settings.subnet_settings[0]
706         self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
707                                         subnet_setting.cidr, True))
708
709         self.port = neutron_utils.create_port(
710             self.neutron, self.os_creds, PortConfig(
711                 name=self.port_name,
712                 network_name=self.net_config.network_settings.name,
713                 ip_addrs=[{
714                     'subnet_name': subnet_setting.name,
715                     'ip': ip_1}]))
716         validate_port(self.neutron, self.port, self.port_name)
717
718     def test_create_port_null_name(self):
719         """
720         Tests the neutron_utils.create_port() when the port name value is None
721         """
722         self.network = neutron_utils.create_network(
723             self.neutron, self.os_creds, self.net_config.network_settings)
724         self.assertEqual(self.net_config.network_settings.name,
725                          self.network.name)
726         self.assertTrue(validate_network(
727             self.neutron, self.keystone,
728             self.net_config.network_settings.name, True,
729             self.os_creds.project_name))
730
731         subnet_setting = self.net_config.network_settings.subnet_settings[0]
732         self.assertTrue(validate_subnet(
733             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
734
735         self.port = neutron_utils.create_port(
736             self.neutron, self.os_creds,
737             PortConfig(
738                 network_name=self.net_config.network_settings.name,
739                 ip_addrs=[{
740                     'subnet_name': subnet_setting.name,
741                     'ip': ip_1}]))
742
743         port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
744         self.assertEqual(self.port, port)
745
746     def test_create_port_null_network_object(self):
747         """
748         Tests the neutron_utils.create_port() function for an Exception when
749         the network object is None
750         """
751         with self.assertRaises(Exception):
752             self.port = neutron_utils.create_port(
753                 self.neutron, self.os_creds,
754                 PortConfig(
755                     name=self.port_name,
756                     network_name=self.net_config.network_settings.name,
757                     ip_addrs=[{
758                         'subnet_name':
759                             self.net_config.network_settings.subnet_settings[
760                                 0].name,
761                         'ip': ip_1}]))
762
763     def test_create_port_null_ip(self):
764         """
765         Tests the neutron_utils.create_port() function for an Exception when
766         the IP value is None
767         """
768         self.network = neutron_utils.create_network(
769             self.neutron, self.os_creds, self.net_config.network_settings)
770         self.assertEqual(self.net_config.network_settings.name,
771                          self.network.name)
772         self.assertTrue(validate_network(
773             self.neutron, self.keystone,
774             self.net_config.network_settings.name, True,
775             self.os_creds.project_name))
776
777         subnet_setting = self.net_config.network_settings.subnet_settings[0]
778         self.assertTrue(validate_subnet(
779             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
780
781         with self.assertRaises(Exception):
782             self.port = neutron_utils.create_port(
783                 self.neutron, self.os_creds,
784                 PortConfig(
785                     name=self.port_name,
786                     network_name=self.net_config.network_settings.name,
787                     ip_addrs=[{
788                         'subnet_name': subnet_setting.name,
789                         'ip': None}]))
790
791     def test_create_port_invalid_ip(self):
792         """
793         Tests the neutron_utils.create_port() function for an Exception when
794         the IP value is None
795         """
796         self.network = neutron_utils.create_network(
797             self.neutron, self.os_creds, self.net_config.network_settings)
798         self.assertEqual(self.net_config.network_settings.name,
799                          self.network.name)
800         self.assertTrue(validate_network(
801             self.neutron, self.keystone,
802             self.net_config.network_settings.name, True,
803             self.os_creds.project_name))
804
805         subnet_setting = self.net_config.network_settings.subnet_settings[0]
806         self.assertTrue(validate_subnet(
807             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
808
809         with self.assertRaises(Exception):
810             self.port = neutron_utils.create_port(
811                 self.neutron, self.os_creds,
812                 PortConfig(
813                     name=self.port_name,
814                     network_name=self.net_config.network_settings.name,
815                     ip_addrs=[{
816                         'subnet_name': subnet_setting.name,
817                         'ip': 'foo'}]))
818
819     def test_create_port_invalid_ip_to_subnet(self):
820         """
821         Tests the neutron_utils.create_port() function for an Exception when
822         the IP value is None
823         """
824         self.network = neutron_utils.create_network(
825             self.neutron, self.os_creds, self.net_config.network_settings)
826         self.assertEqual(self.net_config.network_settings.name,
827                          self.network.name)
828         self.assertTrue(validate_network(
829             self.neutron, self.keystone,
830             self.net_config.network_settings.name, True,
831             self.os_creds.project_name))
832
833         subnet_setting = self.net_config.network_settings.subnet_settings[0]
834         self.assertTrue(validate_subnet(
835             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
836
837         with self.assertRaises(Exception):
838             self.port = neutron_utils.create_port(
839                 self.neutron, self.os_creds,
840                 PortConfig(
841                     name=self.port_name,
842                     network_name=self.net_config.network_settings.name,
843                     ip_addrs=[{
844                         'subnet_name': subnet_setting.name,
845                         'ip': '10.197.123.100'}]))
846
847
848 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
849     """
850     Test for creating security groups via neutron_utils.py
851     """
852
853     def setUp(self):
854         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
855         self.sec_grp_name = guid + 'name'
856
857         self.security_groups = list()
858         self.security_group_rules = list()
859         self.neutron = neutron_utils.neutron_client(self.os_creds)
860         self.keystone = keystone_utils.keystone_client(self.os_creds)
861
862     def tearDown(self):
863         """
864         Cleans the remote OpenStack objects
865         """
866         for rule in self.security_group_rules:
867             neutron_utils.delete_security_group_rule(self.neutron, rule)
868
869         for security_group in self.security_groups:
870             try:
871                 neutron_utils.delete_security_group(self.neutron,
872                                                     security_group)
873             except:
874                 pass
875
876     def test_create_delete_simple_sec_grp(self):
877         """
878         Tests the neutron_utils.create_security_group() function
879         """
880         sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
881         security_group = neutron_utils.create_security_group(
882             self.neutron, self.keystone, sec_grp_settings)
883
884         self.assertTrue(sec_grp_settings.name, security_group.name)
885
886         sec_grp_get = neutron_utils.get_security_group(
887             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
888         self.assertIsNotNone(sec_grp_get)
889         self.assertTrue(validation_utils.objects_equivalent(
890             security_group, sec_grp_get))
891
892         neutron_utils.delete_security_group(self.neutron, security_group)
893         sec_grp_get = neutron_utils.get_security_group(
894             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
895         self.assertIsNone(sec_grp_get)
896
897     def test_create_sec_grp_no_name(self):
898         """
899         Tests the SecurityGroupConfig constructor and
900         neutron_utils.create_security_group() function to ensure that
901         attempting to create a security group without a name will raise an
902         exception
903         """
904         with self.assertRaises(Exception):
905             sec_grp_settings = SecurityGroupConfig()
906             self.security_groups.append(
907                 neutron_utils.create_security_group(
908                     self.neutron, self.keystone, sec_grp_settings))
909
910     def test_create_sec_grp_no_rules(self):
911         """
912         Tests the neutron_utils.create_security_group() function
913         """
914         sec_grp_settings = SecurityGroupConfig(
915             name=self.sec_grp_name, description='hello group')
916         self.security_groups.append(
917             neutron_utils.create_security_group(
918                 self.neutron, self.keystone, sec_grp_settings))
919
920         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
921         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
922
923         sec_grp_get = neutron_utils.get_security_group(
924             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
925         self.assertIsNotNone(sec_grp_get)
926         self.assertEqual(self.security_groups[0], sec_grp_get)
927
928     def test_create_sec_grp_one_rule(self):
929         """
930         Tests the neutron_utils.create_security_group() function
931         """
932
933         sec_grp_rule_settings = SecurityGroupRuleConfig(
934             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
935         sec_grp_settings = SecurityGroupConfig(
936             name=self.sec_grp_name, description='hello group',
937             rule_settings=[sec_grp_rule_settings])
938
939         self.security_groups.append(
940             neutron_utils.create_security_group(
941                 self.neutron, self.keystone, sec_grp_settings))
942         free_rules = neutron_utils.get_rules_by_security_group(
943             self.neutron, self.security_groups[0])
944         for free_rule in free_rules:
945             self.security_group_rules.append(free_rule)
946
947         keystone = keystone_utils.keystone_client(self.os_creds)
948         self.security_group_rules.append(
949             neutron_utils.create_security_group_rule(
950                 self.neutron, keystone, sec_grp_settings.rule_settings[0],
951                 self.os_creds.project_name))
952
953         # Refresh object so it is populated with the newly added rule
954         security_group = neutron_utils.get_security_group(
955             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
956
957         rules = neutron_utils.get_rules_by_security_group(
958             self.neutron, security_group)
959
960         self.assertTrue(
961             validation_utils.objects_equivalent(
962                  self.security_group_rules, rules))
963
964         self.assertTrue(sec_grp_settings.name, security_group.name)
965
966         sec_grp_get = neutron_utils.get_security_group(
967             self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
968         self.assertIsNotNone(sec_grp_get)
969         self.assertEqual(security_group, sec_grp_get)
970
971     def test_get_sec_grp_by_id(self):
972         """
973         Tests the neutron_utils.create_security_group() function
974         """
975
976         self.security_groups.append(neutron_utils.create_security_group(
977             self.neutron, self.keystone,
978             SecurityGroupConfig(
979                 name=self.sec_grp_name + '-1', description='hello group')))
980         self.security_groups.append(neutron_utils.create_security_group(
981             self.neutron, self.keystone,
982             SecurityGroupConfig(
983                 name=self.sec_grp_name + '-2', description='hello group')))
984
985         sec_grp_1b = neutron_utils.get_security_group_by_id(
986             self.neutron, self.security_groups[0].id)
987         sec_grp_2b = neutron_utils.get_security_group_by_id(
988             self.neutron, self.security_groups[1].id)
989
990         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
991         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
992
993
994 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
995     """
996     Test basic nova keypair functionality
997     """
998
999     def setUp(self):
1000         """
1001         Instantiates the CreateImage object that is responsible for downloading
1002         and creating an OS image file within OpenStack
1003         """
1004         self.neutron = neutron_utils.neutron_client(self.os_creds)
1005         self.keystone = keystone_utils.keystone_client(self.os_creds)
1006         self.floating_ip = None
1007
1008     def tearDown(self):
1009         """
1010         Cleans the image and downloaded image file
1011         """
1012         if self.floating_ip:
1013             try:
1014                 neutron_utils.delete_floating_ip(
1015                     self.neutron, self.floating_ip)
1016             except:
1017                 pass
1018
1019     def test_floating_ips(self):
1020         """
1021         Tests the creation of a floating IP
1022         :return:
1023         """
1024         initial_fips = neutron_utils.get_floating_ips(self.neutron)
1025
1026         self.floating_ip = neutron_utils.create_floating_ip(
1027             self.neutron, self.keystone, self.ext_net_name)
1028         all_fips = neutron_utils.get_floating_ips(self.neutron)
1029         self.assertEqual(len(initial_fips) + 1, len(all_fips))
1030         returned = neutron_utils.get_floating_ip(self.neutron,
1031                                                  self.floating_ip)
1032         self.assertEqual(self.floating_ip.id, returned.id)
1033         self.assertEqual(self.floating_ip.ip, returned.ip)
1034
1035
1036 """
1037 Validation routines
1038 """
1039
1040
1041 def validate_network(neutron, keystone, name, exists, project_name):
1042     """
1043     Returns true if a network for a given name DOES NOT exist if the exists
1044     parameter is false conversely true. Returns false if a network for a given
1045     name DOES exist if the exists parameter is true conversely false.
1046     :param neutron: The neutron client
1047     :param keystone: The keystone client
1048     :param name: The expected network name
1049     :param exists: Whether or not the network name should exist or not
1050     :param project_name: the associated project name
1051     :return: True/False
1052     """
1053     network = neutron_utils.get_network(
1054         neutron, keystone, network_name=name, project_name=project_name)
1055     if exists and network:
1056         return True
1057     if not exists and not network:
1058         return True
1059     return False
1060
1061
1062 def validate_subnet(neutron, name, cidr, exists):
1063     """
1064     Returns true if a subnet for a given name DOES NOT exist if the exists
1065     parameter is false conversely true. Returns false if a subnet for a given
1066     name DOES exist if the exists parameter is true conversely false.
1067     :param neutron: The neutron client
1068     :param name: The expected subnet name
1069     :param cidr: The expected CIDR value
1070     :param exists: Whether or not the network name should exist or not
1071     :return: True/False
1072     """
1073     subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
1074     if exists and subnet and subnet.name == name:
1075         return subnet.cidr == cidr
1076     if not exists and not subnet:
1077         return True
1078     return False
1079
1080
1081 def validate_router(neutron, keystone, name, project_name, exists):
1082     """
1083     Returns true if a router for a given name DOES NOT exist if the exists
1084     parameter is false conversely true. Returns false if a router for a given
1085     name DOES exist if the exists parameter is true conversely false.
1086     :param neutron: The neutron client
1087     :param keystone: The keystone client
1088     :param name: The expected router name
1089     :param project_name: The name of the project in which the router should
1090                          exist
1091     :param exists: Whether or not the network name should exist or not
1092     :return: True/False
1093     """
1094     router = neutron_utils.get_router(
1095         neutron, keystone, router_name=name, project_name=project_name)
1096     if exists and router:
1097         return True
1098     return False
1099
1100
1101 def validate_interface_router(interface_router, router, subnet):
1102     """
1103     Returns true if the router ID & subnet ID have been properly included into
1104     the interface router object
1105     :param interface_router: the SNAPS-OO InterfaceRouter domain object
1106     :param router: to validate against the interface_router
1107     :param subnet: to validate against the interface_router
1108     :return: True if both IDs match else False
1109     """
1110     subnet_id = interface_router.subnet_id
1111     router_id = interface_router.port_id
1112
1113     return subnet.id == subnet_id and router.id == router_id
1114
1115
1116 def validate_port(neutron, port_obj, this_port_name):
1117     """
1118     Returns true if a port for a given name DOES NOT exist if the exists
1119     parameter is false conversely true. Returns false if a port for a given
1120     name DOES exist if the exists parameter is true conversely false.
1121     :param neutron: The neutron client
1122     :param port_obj: The port object to lookup
1123     :param this_port_name: The expected router name
1124     :return: True/False
1125     """
1126     os_ports = neutron.list_ports()
1127     for os_port, os_port_insts in os_ports.items():
1128         for os_inst in os_port_insts:
1129             if os_inst['id'] == port_obj.id:
1130                 return os_inst['name'] == this_port_name
1131     return False