Ensure project IDs are handled correctly for Network/Subnets
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from neutronclient.common.exceptions import NotFound, BadRequest
18
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.config.security_group import (
21     SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(self.os_creds)
92         self.network = None
93         self.net_config = openstack_tests.get_pub_net_config(
94             net_name=guid + '-pub-net')
95
96     def tearDown(self):
97         """
98         Cleans the remote OpenStack objects
99         """
100         if self.network:
101             neutron_utils.delete_network(self.neutron, self.network)
102
103     def test_create_network(self):
104         """
105         Tests the neutron_utils.create_network() function
106         """
107         self.network = neutron_utils.create_network(
108             self.neutron, self.os_creds, self.net_config.network_settings)
109         self.assertEqual(self.net_config.network_settings.name,
110                          self.network.name)
111         self.assertTrue(validate_network(
112             self.neutron, self.net_config.network_settings.name, True,
113             self.project_id))
114         self.assertEqual(len(self.net_config.network_settings.subnet_settings),
115                          len(self.network.subnets))
116
117     def test_create_network_empty_name(self):
118         """
119         Tests the neutron_utils.create_network() function with an empty
120         network name
121         """
122         with self.assertRaises(Exception):
123             self.network = neutron_utils.create_network(
124                 self.neutron, self.os_creds,
125                 network_settings=NetworkConfig(name=''))
126
127     def test_create_network_null_name(self):
128         """
129         Tests the neutron_utils.create_network() function when the network
130         name is None
131         """
132         with self.assertRaises(Exception):
133             self.network = neutron_utils.create_network(
134                 self.neutron, self.os_creds,
135                 network_settings=NetworkConfig())
136
137
138 class NeutronUtilsSubnetTests(OSComponentTestCase):
139     """
140     Test for creating networks with subnets via neutron_utils.py
141     """
142
143     def setUp(self):
144         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
145         self.port_name = str(guid) + '-port'
146         self.neutron = neutron_utils.neutron_client(self.os_creds)
147         self.network = None
148         self.net_config = openstack_tests.get_pub_net_config(
149             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
150             external_net=self.ext_net_name)
151
152     def tearDown(self):
153         """
154         Cleans the remote OpenStack objects
155         """
156         if self.network:
157             try:
158                 neutron_utils.delete_network(self.neutron, self.network)
159             except:
160                 pass
161
162     def test_create_subnet(self):
163         """
164         Tests the neutron_utils.create_network() function
165         """
166         self.network = neutron_utils.create_network(
167             self.neutron, self.os_creds, self.net_config.network_settings)
168         self.assertEqual(self.net_config.network_settings.name,
169                          self.network.name)
170         self.assertTrue(validate_network(
171             self.neutron, self.net_config.network_settings.name, True,
172             self.project_id))
173
174         subnet_setting = self.net_config.network_settings.subnet_settings[0]
175         self.assertTrue(validate_subnet(
176             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
177
178         subnet_query1 = neutron_utils.get_subnet(
179             self.neutron, subnet_name=subnet_setting.name)
180         self.assertEqual(self.network.subnets[0], subnet_query1)
181
182         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
183                                                              self.network)
184         self.assertIsNotNone(subnet_query2)
185         self.assertEqual(1, len(subnet_query2))
186         self.assertEqual(self.network.subnets[0], subnet_query2[0])
187
188     def test_create_subnet_null_name(self):
189         """
190         Tests the neutron_utils.create_neutron_subnet() function for an
191         Exception when the subnet name is None
192         """
193         self.network = neutron_utils.create_network(
194             self.neutron, self.os_creds, self.net_config.network_settings)
195         self.assertEqual(self.net_config.network_settings.name,
196                          self.network.name)
197         self.assertTrue(validate_network(
198             self.neutron, self.net_config.network_settings.name, True,
199             self.project_id))
200
201         with self.assertRaises(Exception):
202             SubnetConfig(cidr=self.net_config.subnet_cidr)
203
204     def test_create_subnet_empty_name(self):
205         """
206         Tests the neutron_utils.create_network() function with an empty
207         name
208         """
209         self.network = neutron_utils.create_network(
210             self.neutron, self.os_creds, self.net_config.network_settings)
211         self.assertEqual(self.net_config.network_settings.name,
212                          self.network.name)
213         self.assertTrue(validate_network(
214             self.neutron, self.net_config.network_settings.name, True,
215             self.project_id))
216
217         subnet_setting = self.net_config.network_settings.subnet_settings[0]
218         self.assertTrue(validate_subnet(
219             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
220         self.assertFalse(validate_subnet(
221             self.neutron, '', subnet_setting.cidr, True))
222
223         subnet_query1 = neutron_utils.get_subnet(
224             self.neutron, subnet_name=subnet_setting.name)
225         self.assertEqual(self.network.subnets[0], subnet_query1)
226
227         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
228                                                              self.network)
229         self.assertIsNotNone(subnet_query2)
230         self.assertEqual(1, len(subnet_query2))
231         self.assertEqual(self.network.subnets[0], subnet_query2[0])
232
233     def test_create_subnet_null_cidr(self):
234         """
235         Tests the neutron_utils.create_neutron_subnet() function for an
236         Exception when the subnet CIDR value is None
237         """
238         self.net_config.network_settings.subnet_settings[0].cidr = None
239         with self.assertRaises(Exception):
240             self.network = neutron_utils.create_network(
241                 self.neutron, self.os_creds, self.net_config.network_settings)
242
243     def test_create_subnet_empty_cidr(self):
244         """
245         Tests the neutron_utils.create_neutron_subnet() function for an
246         Exception when the subnet CIDR value is empty
247         """
248         self.net_config.network_settings.subnet_settings[0].cidr = ''
249         with self.assertRaises(Exception):
250             self.network = neutron_utils.create_network(
251                 self.neutron, self.os_creds, self.net_config.network_settings)
252
253
254 class NeutronUtilsIPv6Tests(OSComponentTestCase):
255     """
256     Test for creating IPv6 networks with subnets via neutron_utils.py
257     """
258
259     def setUp(self):
260         self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
261         self.neutron = neutron_utils.neutron_client(self.os_creds)
262         self.network = None
263
264     def tearDown(self):
265         """
266         Cleans the remote OpenStack objects
267         """
268         if self.network:
269             try:
270                 neutron_utils.delete_network(self.neutron, self.network)
271             except:
272                 pass
273
274     def test_create_network_slaac(self):
275         """
276         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
277         is True and IPv6 modes are slaac
278         """
279         sub_setting = SubnetConfig(
280             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
281             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
282             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
283             enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
284         self.network_settings = NetworkConfig(
285             name=self.guid + '-net', subnet_settings=[sub_setting])
286
287         self.network = neutron_utils.create_network(
288             self.neutron, self.os_creds, self.network_settings)
289         self.assertEqual(self.network_settings.name, self.network.name)
290
291         subnet_settings = self.network_settings.subnet_settings[0]
292         self.assertEqual(1, len(self.network.subnets))
293         subnet = self.network.subnets[0]
294
295         self.assertEqual(self.network.id, subnet.network_id)
296         self.assertEqual(subnet_settings.name, subnet.name)
297         self.assertEqual(subnet_settings.start, subnet.start)
298         self.assertEqual(subnet_settings.end, subnet.end)
299         self.assertEqual('1:1::/64', subnet.cidr)
300         self.assertEqual(6, subnet.ip_version)
301         self.assertEqual(1, len(subnet.dns_nameservers))
302         self.assertEqual(
303             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
304         self.assertTrue(subnet.enable_dhcp)
305         self.assertEqual(
306             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
307         self.assertEqual(
308             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
309
310     def test_create_network_stateful(self):
311         """
312         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
313         is True and IPv6 modes are stateful
314         """
315         sub_setting = SubnetConfig(
316             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
317             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
318             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
319             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
320             ipv6_address_mode='dhcpv6-stateful')
321         self.network_settings = NetworkConfig(
322             name=self.guid + '-net', subnet_settings=[sub_setting])
323
324         self.network = neutron_utils.create_network(
325             self.neutron, self.os_creds, self.network_settings)
326
327         self.assertEqual(self.network_settings.name, self.network.name)
328
329         subnet_settings = self.network_settings.subnet_settings[0]
330         self.assertEqual(1, len(self.network.subnets))
331         subnet = self.network.subnets[0]
332
333         self.assertEqual(self.network.id, subnet.network_id)
334         self.assertEqual(subnet_settings.name, subnet.name)
335         self.assertEqual(subnet_settings.start, subnet.start)
336         self.assertEqual(subnet_settings.end, subnet.end)
337         self.assertEqual('1:1::/64', subnet.cidr)
338         self.assertEqual(6, subnet.ip_version)
339         self.assertEqual(1, len(subnet.dns_nameservers))
340         self.assertEqual(
341             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
342         self.assertTrue(subnet.enable_dhcp)
343         self.assertEqual(
344             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
345         self.assertEqual(
346             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
347
348     def test_create_network_stateless(self):
349         """
350         Tests the neutron_utils.create_network() when DHCP is enabled and
351         the RA and address modes are both 'slaac'
352         """
353         sub_setting = SubnetConfig(
354             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
355             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
356             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
357             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
358             ipv6_address_mode='dhcpv6-stateless')
359         self.network_settings = NetworkConfig(
360             name=self.guid + '-net', subnet_settings=[sub_setting])
361
362         self.network = neutron_utils.create_network(
363             self.neutron, self.os_creds, self.network_settings)
364
365         self.assertEqual(self.network_settings.name, self.network.name)
366
367         subnet_settings = self.network_settings.subnet_settings[0]
368         self.assertEqual(1, len(self.network.subnets))
369         subnet = self.network.subnets[0]
370
371         self.assertEqual(self.network.id, subnet.network_id)
372         self.assertEqual(subnet_settings.name, subnet.name)
373         self.assertEqual(subnet_settings.start, subnet.start)
374         self.assertEqual(subnet_settings.end, subnet.end)
375         self.assertEqual('1:1::/64', subnet.cidr)
376         self.assertEqual(6, subnet.ip_version)
377         self.assertEqual(1, len(subnet.dns_nameservers))
378         self.assertEqual(
379             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
380         self.assertTrue(subnet.enable_dhcp)
381         self.assertEqual(
382             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
383         self.assertEqual(
384             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
385
386     def test_create_network_no_dhcp_slaac(self):
387         """
388         Tests the neutron_utils.create_network() for a BadRequest when
389         DHCP is not enabled and the RA and address modes are both 'slaac'
390         """
391         sub_setting = SubnetConfig(
392             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
393             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
394             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
395             enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
396         self.network_settings = NetworkConfig(
397             name=self.guid + '-net', subnet_settings=[sub_setting])
398
399         with self.assertRaises(BadRequest):
400             self.network = neutron_utils.create_network(
401                 self.neutron, self.os_creds, self.network_settings)
402
403     def test_create_network_invalid_start_ip(self):
404         """
405         Tests the neutron_utils.create_network() that contains one IPv6 subnet
406         with an invalid start IP to ensure Neutron assigns it the smallest IP
407         possible
408         """
409         sub_setting = SubnetConfig(
410             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
411             start='foo')
412         self.network_settings = NetworkConfig(
413             name=self.guid + '-net', subnet_settings=[sub_setting])
414
415         self.network = neutron_utils.create_network(
416             self.neutron, self.os_creds, self.network_settings)
417
418         self.assertEqual('1:1::2', self.network.subnets[0].start)
419         self.assertEqual(
420             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
421
422     def test_create_network_invalid_end_ip(self):
423         """
424         Tests the neutron_utils.create_network() that contains one IPv6 subnet
425         with an invalid end IP to ensure Neutron assigns it the largest IP
426         possible
427         """
428         sub_setting = SubnetConfig(
429             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
430             end='bar')
431         self.network_settings = NetworkConfig(
432             name=self.guid + '-net', subnet_settings=[sub_setting])
433
434         self.network = neutron_utils.create_network(
435             self.neutron, self.os_creds, self.network_settings)
436
437         self.assertEqual('1:1::2', self.network.subnets[0].start)
438         self.assertEqual(
439             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
440
441     def test_create_network_with_bad_cidr(self):
442         """
443         Tests the neutron_utils.create_network() for a BadRequest when
444         the subnet CIDR is invalid
445         """
446         sub_setting = SubnetConfig(
447             name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
448         self.network_settings = NetworkConfig(
449             name=self.guid + '-net', subnet_settings=[sub_setting])
450
451         with self.assertRaises(BadRequest):
452             self.network = neutron_utils.create_network(
453                 self.neutron, self.os_creds, self.network_settings)
454
455     def test_create_network_invalid_gateway_ip(self):
456         """
457         Tests the neutron_utils.create_network() for a BadRequest when
458         the subnet gateway IP is invalid
459         """
460         sub_setting = SubnetConfig(
461             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
462             gateway_ip='192.168.0.1')
463         self.network_settings = NetworkConfig(
464             name=self.guid + '-net', subnet_settings=[sub_setting])
465
466         with self.assertRaises(BadRequest):
467             self.network = neutron_utils.create_network(
468                 self.neutron, self.os_creds, self.network_settings)
469
470     def test_create_network_with_bad_dns(self):
471         """
472         Tests the neutron_utils.create_network() for a BadRequest when
473         the DNS IP is invalid
474         """
475         sub_setting = SubnetConfig(
476             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
477             dns_nameservers=['foo'])
478         self.network_settings = NetworkConfig(
479             name=self.guid + '-net', subnet_settings=[sub_setting])
480
481         with self.assertRaises(BadRequest):
482             self.network = neutron_utils.create_network(
483                     self.neutron, self.os_creds, self.network_settings)
484
485
486 class NeutronUtilsRouterTests(OSComponentTestCase):
487     """
488     Test for creating routers via neutron_utils.py
489     """
490
491     def setUp(self):
492         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
493         self.port_name = str(guid) + '-port'
494         self.neutron = neutron_utils.neutron_client(self.os_creds)
495         self.network = None
496         self.port = None
497         self.router = None
498         self.interface_router = None
499         self.net_config = openstack_tests.get_pub_net_config(
500             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
501             router_name=guid + '-pub-router', external_net=self.ext_net_name)
502
503     def tearDown(self):
504         """
505         Cleans the remote OpenStack objects
506         """
507         if self.interface_router:
508             neutron_utils.remove_interface_router(
509                 self.neutron, self.router, self.network.subnets[0])
510
511         if self.router:
512             try:
513                 neutron_utils.delete_router(self.neutron, self.router)
514                 validate_router(self.neutron, self.router.name, False)
515             except:
516                 pass
517
518         if self.port:
519             try:
520                 neutron_utils.delete_port(self.neutron, self.port)
521             except:
522                 pass
523
524         if self.network:
525             neutron_utils.delete_network(self.neutron, self.network)
526
527     def test_create_router_simple(self):
528         """
529         Tests the neutron_utils.create_router()
530         """
531         self.router = neutron_utils.create_router(
532             self.neutron, self.os_creds, self.net_config.router_settings,
533             self.project_id)
534         validate_router(self.neutron, self.net_config.router_settings.name,
535                         True)
536
537     def test_create_router_with_public_interface(self):
538         """
539         Tests the neutron_utils.create_router() function with a pubic interface
540         """
541         subnet_setting = self.net_config.network_settings.subnet_settings[0]
542         self.net_config = openstack_tests.OSNetworkConfig(
543             self.net_config.network_settings.name,
544             subnet_setting.name,
545             subnet_setting.cidr,
546             self.net_config.router_settings.name,
547             self.ext_net_name)
548         self.router = neutron_utils.create_router(
549             self.neutron, self.os_creds, self.net_config.router_settings,
550             self.project_id)
551         validate_router(self.neutron, self.net_config.router_settings.name,
552                         True)
553
554         ext_net = neutron_utils.get_network(
555             self.neutron, network_name=self.ext_net_name)
556         self.assertEqual(self.router.external_network_id, ext_net.id)
557
558     def test_add_interface_router(self):
559         """
560         Tests the neutron_utils.add_interface_router() function
561         """
562         self.network = neutron_utils.create_network(
563             self.neutron, self.os_creds, self.net_config.network_settings)
564         self.assertEqual(self.net_config.network_settings.name,
565                          self.network.name)
566         self.assertTrue(validate_network(
567             self.neutron, self.net_config.network_settings.name, True,
568             self.project_id))
569
570         subnet_setting = self.net_config.network_settings.subnet_settings[0]
571         self.assertTrue(validate_subnet(
572             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
573
574         self.router = neutron_utils.create_router(
575             self.neutron, self.os_creds, self.net_config.router_settings,
576             self.project_id)
577         validate_router(self.neutron, self.net_config.router_settings.name,
578                         True)
579
580         self.interface_router = neutron_utils.add_interface_router(
581             self.neutron, self.router, self.network.subnets[0])
582         validate_interface_router(self.interface_router, self.router,
583                                   self.network.subnets[0])
584
585     def test_add_interface_router_null_router(self):
586         """
587         Tests the neutron_utils.add_interface_router() function for an
588         Exception when the router value is None
589         """
590         self.network = neutron_utils.create_network(
591             self.neutron, self.os_creds, self.net_config.network_settings)
592         self.assertEqual(self.net_config.network_settings.name,
593                          self.network.name)
594         self.assertTrue(validate_network(
595             self.neutron, self.net_config.network_settings.name, True,
596             self.project_id))
597
598         subnet_setting = self.net_config.network_settings.subnet_settings[0]
599         self.assertTrue(validate_subnet(
600             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
601
602         with self.assertRaises(NeutronException):
603             self.interface_router = neutron_utils.add_interface_router(
604                 self.neutron, self.router, self.network.subnets[0])
605
606     def test_add_interface_router_null_subnet(self):
607         """
608         Tests the neutron_utils.add_interface_router() function for an
609         Exception when the subnet value is None
610         """
611         self.network = neutron_utils.create_network(
612             self.neutron, self.os_creds, self.net_config.network_settings)
613         self.assertEqual(self.net_config.network_settings.name,
614                          self.network.name)
615         self.assertTrue(validate_network(
616             self.neutron, self.net_config.network_settings.name, True,
617             self.project_id))
618
619         self.router = neutron_utils.create_router(
620             self.neutron, self.os_creds, self.net_config.router_settings,
621             self.project_id)
622         validate_router(self.neutron, self.net_config.router_settings.name,
623                         True)
624
625         with self.assertRaises(NeutronException):
626             self.interface_router = neutron_utils.add_interface_router(
627                 self.neutron, self.router, None)
628
629     def test_add_interface_router_missing_subnet(self):
630         """
631         Tests the neutron_utils.add_interface_router() function for an
632         Exception when the subnet object has been deleted
633         """
634         self.network = neutron_utils.create_network(
635             self.neutron, self.os_creds, self.net_config.network_settings)
636         self.assertEqual(self.net_config.network_settings.name,
637                          self.network.name)
638         self.assertTrue(validate_network(
639             self.neutron, self.net_config.network_settings.name, True,
640             self.project_id))
641
642         self.router = neutron_utils.create_router(
643             self.neutron, self.os_creds, self.net_config.router_settings,
644             self.project_id)
645         validate_router(self.neutron, self.net_config.router_settings.name,
646                         True)
647
648         for subnet in self.network.subnets:
649             neutron_utils.delete_subnet(self.neutron, subnet)
650
651         with self.assertRaises(NotFound):
652             self.interface_router = neutron_utils.add_interface_router(
653                 self.neutron, self.router, self.network.subnets[0])
654
655     def test_create_port(self):
656         """
657         Tests the neutron_utils.create_port() function
658         """
659         self.network = neutron_utils.create_network(
660             self.neutron, self.os_creds, self.net_config.network_settings)
661         self.assertEqual(self.net_config.network_settings.name,
662                          self.network.name)
663         self.assertTrue(validate_network(
664             self.neutron, self.net_config.network_settings.name, True,
665             self.project_id))
666
667         subnet_setting = self.net_config.network_settings.subnet_settings[0]
668         self.assertTrue(validate_subnet(
669             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
670
671         self.port = neutron_utils.create_port(
672             self.neutron, self.os_creds, PortConfig(
673                 name=self.port_name,
674                 ip_addrs=[{
675                     'subnet_name': subnet_setting.name,
676                     'ip': ip_1}],
677                 network_name=self.net_config.network_settings.name))
678         validate_port(self.neutron, self.port, self.port_name)
679
680     def test_create_port_empty_name(self):
681         """
682         Tests the neutron_utils.create_port() function
683         """
684         self.network = neutron_utils.create_network(
685             self.neutron, self.os_creds, self.net_config.network_settings)
686         self.assertEqual(self.net_config.network_settings.name,
687                          self.network.name)
688         self.assertTrue(validate_network(
689             self.neutron, self.net_config.network_settings.name, True,
690             self.project_id))
691
692         subnet_setting = self.net_config.network_settings.subnet_settings[0]
693         self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
694                                         subnet_setting.cidr, True))
695
696         self.port = neutron_utils.create_port(
697             self.neutron, self.os_creds, PortConfig(
698                 name=self.port_name,
699                 network_name=self.net_config.network_settings.name,
700                 ip_addrs=[{
701                     'subnet_name': subnet_setting.name,
702                     'ip': ip_1}]))
703         validate_port(self.neutron, self.port, self.port_name)
704
705     def test_create_port_null_name(self):
706         """
707         Tests the neutron_utils.create_port() when the port name value is None
708         """
709         self.network = neutron_utils.create_network(
710             self.neutron, self.os_creds, self.net_config.network_settings)
711         self.assertEqual(self.net_config.network_settings.name,
712                          self.network.name)
713         self.assertTrue(validate_network(
714             self.neutron, self.net_config.network_settings.name, True,
715             self.project_id))
716
717         subnet_setting = self.net_config.network_settings.subnet_settings[0]
718         self.assertTrue(validate_subnet(
719             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
720
721         self.port = neutron_utils.create_port(
722             self.neutron, self.os_creds,
723             PortConfig(
724                 network_name=self.net_config.network_settings.name,
725                 ip_addrs=[{
726                     'subnet_name': subnet_setting.name,
727                     'ip': ip_1}]))
728
729         port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
730         self.assertEqual(self.port, port)
731
732     def test_create_port_null_network_object(self):
733         """
734         Tests the neutron_utils.create_port() function for an Exception when
735         the network object is None
736         """
737         with self.assertRaises(Exception):
738             self.port = neutron_utils.create_port(
739                 self.neutron, self.os_creds,
740                 PortConfig(
741                     name=self.port_name,
742                     network_name=self.net_config.network_settings.name,
743                     ip_addrs=[{
744                         'subnet_name':
745                             self.net_config.network_settings.subnet_settings[
746                                 0].name,
747                         'ip': ip_1}]))
748
749     def test_create_port_null_ip(self):
750         """
751         Tests the neutron_utils.create_port() function for an Exception when
752         the IP value is None
753         """
754         self.network = neutron_utils.create_network(
755             self.neutron, self.os_creds, self.net_config.network_settings)
756         self.assertEqual(self.net_config.network_settings.name,
757                          self.network.name)
758         self.assertTrue(validate_network(
759             self.neutron, self.net_config.network_settings.name, True,
760             self.project_id))
761
762         subnet_setting = self.net_config.network_settings.subnet_settings[0]
763         self.assertTrue(validate_subnet(
764             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
765
766         with self.assertRaises(Exception):
767             self.port = neutron_utils.create_port(
768                 self.neutron, self.os_creds,
769                 PortConfig(
770                     name=self.port_name,
771                     network_name=self.net_config.network_settings.name,
772                     ip_addrs=[{
773                         'subnet_name': subnet_setting.name,
774                         'ip': None}]))
775
776     def test_create_port_invalid_ip(self):
777         """
778         Tests the neutron_utils.create_port() function for an Exception when
779         the IP value is None
780         """
781         self.network = neutron_utils.create_network(
782             self.neutron, self.os_creds, self.net_config.network_settings)
783         self.assertEqual(self.net_config.network_settings.name,
784                          self.network.name)
785         self.assertTrue(validate_network(
786             self.neutron, self.net_config.network_settings.name, True,
787             self.project_id))
788
789         subnet_setting = self.net_config.network_settings.subnet_settings[0]
790         self.assertTrue(validate_subnet(
791             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
792
793         with self.assertRaises(Exception):
794             self.port = neutron_utils.create_port(
795                 self.neutron, self.os_creds,
796                 PortConfig(
797                     name=self.port_name,
798                     network_name=self.net_config.network_settings.name,
799                     ip_addrs=[{
800                         'subnet_name': subnet_setting.name,
801                         'ip': 'foo'}]))
802
803     def test_create_port_invalid_ip_to_subnet(self):
804         """
805         Tests the neutron_utils.create_port() function for an Exception when
806         the IP value is None
807         """
808         self.network = neutron_utils.create_network(
809             self.neutron, self.os_creds, self.net_config.network_settings)
810         self.assertEqual(self.net_config.network_settings.name,
811                          self.network.name)
812         self.assertTrue(validate_network(
813             self.neutron, self.net_config.network_settings.name, True,
814             self.project_id))
815
816         subnet_setting = self.net_config.network_settings.subnet_settings[0]
817         self.assertTrue(validate_subnet(
818             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
819
820         with self.assertRaises(Exception):
821             self.port = neutron_utils.create_port(
822                 self.neutron, self.os_creds,
823                 PortConfig(
824                     name=self.port_name,
825                     network_name=self.net_config.network_settings.name,
826                     ip_addrs=[{
827                         'subnet_name': subnet_setting.name,
828                         'ip': '10.197.123.100'}]))
829
830
831 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
832     """
833     Test for creating security groups via neutron_utils.py
834     """
835
836     def setUp(self):
837         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
838         self.sec_grp_name = guid + 'name'
839
840         self.security_groups = list()
841         self.security_group_rules = list()
842         self.neutron = neutron_utils.neutron_client(self.os_creds)
843         self.keystone = keystone_utils.keystone_client(self.os_creds)
844
845     def tearDown(self):
846         """
847         Cleans the remote OpenStack objects
848         """
849         for rule in self.security_group_rules:
850             neutron_utils.delete_security_group_rule(self.neutron, rule)
851
852         for security_group in self.security_groups:
853             try:
854                 neutron_utils.delete_security_group(self.neutron,
855                                                     security_group)
856             except:
857                 pass
858
859     def test_create_delete_simple_sec_grp(self):
860         """
861         Tests the neutron_utils.create_security_group() function
862         """
863         sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
864         security_group = neutron_utils.create_security_group(
865             self.neutron, self.keystone, sec_grp_settings, self.project_id)
866
867         self.assertTrue(sec_grp_settings.name, security_group.name)
868
869         sec_grp_get = neutron_utils.get_security_group(
870             self.neutron, sec_grp_settings=sec_grp_settings)
871         self.assertIsNotNone(sec_grp_get)
872         self.assertTrue(validation_utils.objects_equivalent(
873             security_group, sec_grp_get))
874
875         neutron_utils.delete_security_group(self.neutron, security_group)
876         sec_grp_get = neutron_utils.get_security_group(
877             self.neutron, sec_grp_settings=sec_grp_settings)
878         self.assertIsNone(sec_grp_get)
879
880     def test_create_sec_grp_no_name(self):
881         """
882         Tests the SecurityGroupConfig constructor and
883         neutron_utils.create_security_group() function to ensure that
884         attempting to create a security group without a name will raise an
885         exception
886         """
887         with self.assertRaises(Exception):
888             sec_grp_settings = SecurityGroupConfig()
889             self.security_groups.append(
890                 neutron_utils.create_security_group(
891                     self.neutron, self.keystone, sec_grp_settings,
892                     self.project_id))
893
894     def test_create_sec_grp_no_rules(self):
895         """
896         Tests the neutron_utils.create_security_group() function
897         """
898         sec_grp_settings = SecurityGroupConfig(
899             name=self.sec_grp_name, description='hello group')
900         self.security_groups.append(
901             neutron_utils.create_security_group(
902                 self.neutron, self.keystone, sec_grp_settings,
903                 self.project_id))
904
905         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
906         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
907
908         sec_grp_get = neutron_utils.get_security_group(
909             self.neutron, sec_grp_settings=sec_grp_settings)
910         self.assertIsNotNone(sec_grp_get)
911         self.assertEqual(self.security_groups[0], sec_grp_get)
912
913     def test_create_sec_grp_one_rule(self):
914         """
915         Tests the neutron_utils.create_security_group() function
916         """
917
918         sec_grp_rule_settings = SecurityGroupRuleConfig(
919             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
920         sec_grp_settings = SecurityGroupConfig(
921             name=self.sec_grp_name, description='hello group',
922             rule_settings=[sec_grp_rule_settings])
923
924         self.security_groups.append(
925             neutron_utils.create_security_group(
926                 self.neutron, self.keystone, sec_grp_settings,
927                 self.project_id))
928         free_rules = neutron_utils.get_rules_by_security_group(
929             self.neutron, self.security_groups[0])
930         for free_rule in free_rules:
931             self.security_group_rules.append(free_rule)
932
933         keystone = keystone_utils.keystone_client(self.os_creds)
934         project_id = keystone_utils.get_project(
935             keystone, self.os_creds.project_name)
936         self.security_group_rules.append(
937             neutron_utils.create_security_group_rule(
938                 self.neutron, sec_grp_settings.rule_settings[0], project_id))
939
940         # Refresh object so it is populated with the newly added rule
941         security_group = neutron_utils.get_security_group(
942             self.neutron, sec_grp_settings=sec_grp_settings)
943
944         rules = neutron_utils.get_rules_by_security_group(self.neutron,
945                                                           security_group)
946
947         self.assertTrue(
948             validation_utils.objects_equivalent(
949                  self.security_group_rules, rules))
950
951         self.assertTrue(sec_grp_settings.name, security_group.name)
952
953         sec_grp_get = neutron_utils.get_security_group(
954             self.neutron, sec_grp_settings=sec_grp_settings)
955         self.assertIsNotNone(sec_grp_get)
956         self.assertEqual(security_group, sec_grp_get)
957
958     def test_get_sec_grp_by_id(self):
959         """
960         Tests the neutron_utils.create_security_group() function
961         """
962
963         self.security_groups.append(neutron_utils.create_security_group(
964             self.neutron, self.keystone,
965             SecurityGroupConfig(
966                 name=self.sec_grp_name + '-1', description='hello group'),
967             self.project_id))
968         self.security_groups.append(neutron_utils.create_security_group(
969             self.neutron, self.keystone,
970             SecurityGroupConfig(
971                 name=self.sec_grp_name + '-2', description='hello group'),
972             self.project_id))
973
974         sec_grp_1b = neutron_utils.get_security_group_by_id(
975             self.neutron, self.security_groups[0].id)
976         sec_grp_2b = neutron_utils.get_security_group_by_id(
977             self.neutron, self.security_groups[1].id)
978
979         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
980         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
981
982
983 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
984     """
985     Test basic nova keypair functionality
986     """
987
988     def setUp(self):
989         """
990         Instantiates the CreateImage object that is responsible for downloading
991         and creating an OS image file within OpenStack
992         """
993         self.neutron = neutron_utils.neutron_client(self.os_creds)
994         self.floating_ip = None
995
996     def tearDown(self):
997         """
998         Cleans the image and downloaded image file
999         """
1000         if self.floating_ip:
1001             try:
1002                 neutron_utils.delete_floating_ip(
1003                     self.neutron, self.floating_ip)
1004             except:
1005                 pass
1006
1007     def test_floating_ips(self):
1008         """
1009         Tests the creation of a floating IP
1010         :return:
1011         """
1012         initial_fips = neutron_utils.get_floating_ips(self.neutron)
1013
1014         self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
1015                                                             self.ext_net_name)
1016         all_fips = neutron_utils.get_floating_ips(self.neutron)
1017         self.assertEqual(len(initial_fips) + 1, len(all_fips))
1018         returned = neutron_utils.get_floating_ip(self.neutron,
1019                                                  self.floating_ip)
1020         self.assertEqual(self.floating_ip.id, returned.id)
1021         self.assertEqual(self.floating_ip.ip, returned.ip)
1022
1023
1024 """
1025 Validation routines
1026 """
1027
1028
1029 def validate_network(neutron, name, exists, project_id):
1030     """
1031     Returns true if a network for a given name DOES NOT exist if the exists
1032     parameter is false conversely true. Returns false if a network for a given
1033     name DOES exist if the exists parameter is true conversely false.
1034     :param neutron: The neutron client
1035     :param name: The expected network name
1036     :param exists: Whether or not the network name should exist or not
1037     :param project_id: the associated project ID
1038     :return: True/False
1039     """
1040     network = neutron_utils.get_network(neutron, network_name=name,
1041                                         project_id=project_id)
1042     if exists and network:
1043         return True
1044     if not exists and not network:
1045         return True
1046     return False
1047
1048
1049 def validate_subnet(neutron, name, cidr, exists):
1050     """
1051     Returns true if a subnet for a given name DOES NOT exist if the exists
1052     parameter is false conversely true. Returns false if a subnet for a given
1053     name DOES exist if the exists parameter is true conversely false.
1054     :param neutron: The neutron client
1055     :param name: The expected subnet name
1056     :param cidr: The expected CIDR value
1057     :param exists: Whether or not the network name should exist or not
1058     :return: True/False
1059     """
1060     subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
1061     if exists and subnet and subnet.name == name:
1062         return subnet.cidr == cidr
1063     if not exists and not subnet:
1064         return True
1065     return False
1066
1067
1068 def validate_router(neutron, name, exists):
1069     """
1070     Returns true if a router for a given name DOES NOT exist if the exists
1071     parameter is false conversely true. Returns false if a router for a given
1072     name DOES exist if the exists parameter is true conversely false.
1073     :param neutron: The neutron client
1074     :param name: The expected router name
1075     :param exists: Whether or not the network name should exist or not
1076     :return: True/False
1077     """
1078     router = neutron_utils.get_router(neutron, router_name=name)
1079     if exists and router:
1080         return True
1081     return False
1082
1083
1084 def validate_interface_router(interface_router, router, subnet):
1085     """
1086     Returns true if the router ID & subnet ID have been properly included into
1087     the interface router object
1088     :param interface_router: the SNAPS-OO InterfaceRouter domain object
1089     :param router: to validate against the interface_router
1090     :param subnet: to validate against the interface_router
1091     :return: True if both IDs match else False
1092     """
1093     subnet_id = interface_router.subnet_id
1094     router_id = interface_router.port_id
1095
1096     return subnet.id == subnet_id and router.id == router_id
1097
1098
1099 def validate_port(neutron, port_obj, this_port_name):
1100     """
1101     Returns true if a port for a given name DOES NOT exist if the exists
1102     parameter is false conversely true. Returns false if a port for a given
1103     name DOES exist if the exists parameter is true conversely false.
1104     :param neutron: The neutron client
1105     :param port_obj: The port object to lookup
1106     :param this_port_name: The expected router name
1107     :return: True/False
1108     """
1109     os_ports = neutron.list_ports()
1110     for os_port, os_port_insts in os_ports.items():
1111         for os_inst in os_port_insts:
1112             if os_inst['id'] == port_obj.id:
1113                 return os_inst['name'] == this_port_name
1114     return False