Refactoring of NetworkSettings to extend NetworkConfig
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from neutronclient.common.exceptions import NotFound, BadRequest
18
19 from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
20 from snaps.openstack.create_security_group import (
21     SecurityGroupSettings,  SecurityGroupRuleSettings, Direction)
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(self.os_creds)
92         self.network = None
93         self.net_config = openstack_tests.get_pub_net_config(
94             net_name=guid + '-pub-net')
95
96     def tearDown(self):
97         """
98         Cleans the remote OpenStack objects
99         """
100         if self.network:
101             neutron_utils.delete_network(self.neutron, self.network)
102
103     def test_create_network(self):
104         """
105         Tests the neutron_utils.create_network() function
106         """
107         self.network = neutron_utils.create_network(
108             self.neutron, self.os_creds, self.net_config.network_settings)
109         self.assertEqual(self.net_config.network_settings.name,
110                          self.network.name)
111         self.assertTrue(validate_network(
112             self.neutron, self.net_config.network_settings.name, True))
113         self.assertEqual(len(self.net_config.network_settings.subnet_settings),
114                          len(self.network.subnets))
115
116     def test_create_network_empty_name(self):
117         """
118         Tests the neutron_utils.create_network() function with an empty
119         network name
120         """
121         with self.assertRaises(Exception):
122             self.network = neutron_utils.create_network(
123                 self.neutron, self.os_creds,
124                 network_settings=NetworkConfig(name=''))
125
126     def test_create_network_null_name(self):
127         """
128         Tests the neutron_utils.create_network() function when the network
129         name is None
130         """
131         with self.assertRaises(Exception):
132             self.network = neutron_utils.create_network(
133                 self.neutron, self.os_creds,
134                 network_settings=NetworkConfig())
135
136
137 class NeutronUtilsSubnetTests(OSComponentTestCase):
138     """
139     Test for creating networks with subnets via neutron_utils.py
140     """
141
142     def setUp(self):
143         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
144         self.port_name = str(guid) + '-port'
145         self.neutron = neutron_utils.neutron_client(self.os_creds)
146         self.network = None
147         self.net_config = openstack_tests.get_pub_net_config(
148             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
149             external_net=self.ext_net_name)
150
151     def tearDown(self):
152         """
153         Cleans the remote OpenStack objects
154         """
155         if self.network:
156             try:
157                 neutron_utils.delete_network(self.neutron, self.network)
158             except:
159                 pass
160
161     def test_create_subnet(self):
162         """
163         Tests the neutron_utils.create_network() function
164         """
165         self.network = neutron_utils.create_network(
166             self.neutron, self.os_creds, self.net_config.network_settings)
167         self.assertEqual(self.net_config.network_settings.name,
168                          self.network.name)
169         self.assertTrue(validate_network(
170             self.neutron, self.net_config.network_settings.name, True))
171
172         subnet_setting = self.net_config.network_settings.subnet_settings[0]
173         self.assertTrue(validate_subnet(
174             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
175
176         subnet_query1 = neutron_utils.get_subnet(
177             self.neutron, subnet_name=subnet_setting.name)
178         self.assertEqual(self.network.subnets[0], subnet_query1)
179
180         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
181                                                              self.network)
182         self.assertIsNotNone(subnet_query2)
183         self.assertEqual(1, len(subnet_query2))
184         self.assertEqual(self.network.subnets[0], subnet_query2[0])
185
186     def test_create_subnet_null_name(self):
187         """
188         Tests the neutron_utils.create_neutron_subnet() function for an
189         Exception when the subnet name is None
190         """
191         self.network = neutron_utils.create_network(
192             self.neutron, self.os_creds, self.net_config.network_settings)
193         self.assertEqual(self.net_config.network_settings.name,
194                          self.network.name)
195         self.assertTrue(validate_network(
196             self.neutron, self.net_config.network_settings.name, True))
197
198         with self.assertRaises(Exception):
199             SubnetConfig(cidr=self.net_config.subnet_cidr)
200
201     def test_create_subnet_empty_name(self):
202         """
203         Tests the neutron_utils.create_network() function with an empty
204         name
205         """
206         self.network = neutron_utils.create_network(
207             self.neutron, self.os_creds, self.net_config.network_settings)
208         self.assertEqual(self.net_config.network_settings.name,
209                          self.network.name)
210         self.assertTrue(validate_network(
211             self.neutron, self.net_config.network_settings.name, True))
212
213         subnet_setting = self.net_config.network_settings.subnet_settings[0]
214         self.assertTrue(validate_subnet(
215             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
216         self.assertFalse(validate_subnet(
217             self.neutron, '', subnet_setting.cidr, True))
218
219         subnet_query1 = neutron_utils.get_subnet(
220             self.neutron, subnet_name=subnet_setting.name)
221         self.assertEqual(self.network.subnets[0], subnet_query1)
222
223         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
224                                                              self.network)
225         self.assertIsNotNone(subnet_query2)
226         self.assertEqual(1, len(subnet_query2))
227         self.assertEqual(self.network.subnets[0], subnet_query2[0])
228
229     def test_create_subnet_null_cidr(self):
230         """
231         Tests the neutron_utils.create_neutron_subnet() function for an
232         Exception when the subnet CIDR value is None
233         """
234         self.net_config.network_settings.subnet_settings[0].cidr = None
235         with self.assertRaises(Exception):
236             self.network = neutron_utils.create_network(
237                 self.neutron, self.os_creds, self.net_config.network_settings)
238
239     def test_create_subnet_empty_cidr(self):
240         """
241         Tests the neutron_utils.create_neutron_subnet() function for an
242         Exception when the subnet CIDR value is empty
243         """
244         self.net_config.network_settings.subnet_settings[0].cidr = ''
245         with self.assertRaises(Exception):
246             self.network = neutron_utils.create_network(
247                 self.neutron, self.os_creds, self.net_config.network_settings)
248
249
250 class NeutronUtilsIPv6Tests(OSComponentTestCase):
251     """
252     Test for creating IPv6 networks with subnets via neutron_utils.py
253     """
254
255     def setUp(self):
256         self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
257         self.neutron = neutron_utils.neutron_client(self.os_creds)
258         self.network = None
259
260     def tearDown(self):
261         """
262         Cleans the remote OpenStack objects
263         """
264         if self.network:
265             try:
266                 neutron_utils.delete_network(self.neutron, self.network)
267             except:
268                 pass
269
270     def test_create_network_slaac(self):
271         """
272         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
273         is True and IPv6 modes are slaac
274         """
275         sub_setting = SubnetConfig(
276             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
277             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
278             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
279             enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
280         self.network_settings = NetworkConfig(
281             name=self.guid + '-net', subnet_settings=[sub_setting])
282
283         self.network = neutron_utils.create_network(
284             self.neutron, self.os_creds, self.network_settings)
285         self.assertEqual(self.network_settings.name, self.network.name)
286
287         subnet_settings = self.network_settings.subnet_settings[0]
288         self.assertEqual(1, len(self.network.subnets))
289         subnet = self.network.subnets[0]
290
291         self.assertEqual(self.network.id, subnet.network_id)
292         self.assertEqual(subnet_settings.name, subnet.name)
293         self.assertEqual(subnet_settings.start, subnet.start)
294         self.assertEqual(subnet_settings.end, subnet.end)
295         self.assertEqual('1:1::/64', subnet.cidr)
296         self.assertEqual(6, subnet.ip_version)
297         self.assertEqual(1, len(subnet.dns_nameservers))
298         self.assertEqual(
299             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
300         self.assertTrue(subnet.enable_dhcp)
301         self.assertEqual(
302             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
303         self.assertEqual(
304             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
305
306     def test_create_network_stateful(self):
307         """
308         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
309         is True and IPv6 modes are stateful
310         """
311         sub_setting = SubnetConfig(
312             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
313             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
314             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
315             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
316             ipv6_address_mode='dhcpv6-stateful')
317         self.network_settings = NetworkConfig(
318             name=self.guid + '-net', subnet_settings=[sub_setting])
319
320         self.network = neutron_utils.create_network(
321             self.neutron, self.os_creds, self.network_settings)
322
323         self.assertEqual(self.network_settings.name, self.network.name)
324
325         subnet_settings = self.network_settings.subnet_settings[0]
326         self.assertEqual(1, len(self.network.subnets))
327         subnet = self.network.subnets[0]
328
329         self.assertEqual(self.network.id, subnet.network_id)
330         self.assertEqual(subnet_settings.name, subnet.name)
331         self.assertEqual(subnet_settings.start, subnet.start)
332         self.assertEqual(subnet_settings.end, subnet.end)
333         self.assertEqual('1:1::/64', subnet.cidr)
334         self.assertEqual(6, subnet.ip_version)
335         self.assertEqual(1, len(subnet.dns_nameservers))
336         self.assertEqual(
337             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
338         self.assertTrue(subnet.enable_dhcp)
339         self.assertEqual(
340             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
341         self.assertEqual(
342             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
343
344     def test_create_network_stateless(self):
345         """
346         Tests the neutron_utils.create_network() when DHCP is enabled and
347         the RA and address modes are both 'slaac'
348         """
349         sub_setting = SubnetConfig(
350             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
351             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
352             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
353             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
354             ipv6_address_mode='dhcpv6-stateless')
355         self.network_settings = NetworkConfig(
356             name=self.guid + '-net', subnet_settings=[sub_setting])
357
358         self.network = neutron_utils.create_network(
359             self.neutron, self.os_creds, self.network_settings)
360
361         self.assertEqual(self.network_settings.name, self.network.name)
362
363         subnet_settings = self.network_settings.subnet_settings[0]
364         self.assertEqual(1, len(self.network.subnets))
365         subnet = self.network.subnets[0]
366
367         self.assertEqual(self.network.id, subnet.network_id)
368         self.assertEqual(subnet_settings.name, subnet.name)
369         self.assertEqual(subnet_settings.start, subnet.start)
370         self.assertEqual(subnet_settings.end, subnet.end)
371         self.assertEqual('1:1::/64', subnet.cidr)
372         self.assertEqual(6, subnet.ip_version)
373         self.assertEqual(1, len(subnet.dns_nameservers))
374         self.assertEqual(
375             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
376         self.assertTrue(subnet.enable_dhcp)
377         self.assertEqual(
378             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
379         self.assertEqual(
380             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
381
382     def test_create_network_no_dhcp_slaac(self):
383         """
384         Tests the neutron_utils.create_network() for a BadRequest when
385         DHCP is not enabled and the RA and address modes are both 'slaac'
386         """
387         sub_setting = SubnetConfig(
388             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
389             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
390             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
391             enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
392         self.network_settings = NetworkConfig(
393             name=self.guid + '-net', subnet_settings=[sub_setting])
394
395         with self.assertRaises(BadRequest):
396             self.network = neutron_utils.create_network(
397                 self.neutron, self.os_creds, self.network_settings)
398
399     def test_create_network_invalid_start_ip(self):
400         """
401         Tests the neutron_utils.create_network() that contains one IPv6 subnet
402         with an invalid start IP to ensure Neutron assigns it the smallest IP
403         possible
404         """
405         sub_setting = SubnetConfig(
406             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
407             start='foo')
408         self.network_settings = NetworkConfig(
409             name=self.guid + '-net', subnet_settings=[sub_setting])
410
411         self.network = neutron_utils.create_network(
412             self.neutron, self.os_creds, self.network_settings)
413
414         self.assertEqual('1:1::2', self.network.subnets[0].start)
415         self.assertEqual(
416             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
417
418     def test_create_network_invalid_end_ip(self):
419         """
420         Tests the neutron_utils.create_network() that contains one IPv6 subnet
421         with an invalid end IP to ensure Neutron assigns it the largest IP
422         possible
423         """
424         sub_setting = SubnetConfig(
425             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
426             end='bar')
427         self.network_settings = NetworkConfig(
428             name=self.guid + '-net', subnet_settings=[sub_setting])
429
430         self.network = neutron_utils.create_network(
431             self.neutron, self.os_creds, self.network_settings)
432
433         self.assertEqual('1:1::2', self.network.subnets[0].start)
434         self.assertEqual(
435             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
436
437     def test_create_network_with_bad_cidr(self):
438         """
439         Tests the neutron_utils.create_network() for a BadRequest when
440         the subnet CIDR is invalid
441         """
442         sub_setting = SubnetConfig(
443             name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
444         self.network_settings = NetworkConfig(
445             name=self.guid + '-net', subnet_settings=[sub_setting])
446
447         with self.assertRaises(BadRequest):
448             self.network = neutron_utils.create_network(
449                 self.neutron, self.os_creds, self.network_settings)
450
451     def test_create_network_invalid_gateway_ip(self):
452         """
453         Tests the neutron_utils.create_network() for a BadRequest when
454         the subnet gateway IP is invalid
455         """
456         sub_setting = SubnetConfig(
457             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
458             gateway_ip='1:2::1')
459         self.network_settings = NetworkConfig(
460             name=self.guid + '-net', subnet_settings=[sub_setting])
461
462         with self.assertRaises(BadRequest):
463             self.network = neutron_utils.create_network(
464                 self.neutron, self.os_creds, self.network_settings)
465
466     def test_create_network_with_bad_dns(self):
467         """
468         Tests the neutron_utils.create_network() for a BadRequest when
469         the DNS IP is invalid
470         """
471         sub_setting = SubnetConfig(
472             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
473             dns_nameservers=['foo'])
474         self.network_settings = NetworkConfig(
475             name=self.guid + '-net', subnet_settings=[sub_setting])
476
477         with self.assertRaises(BadRequest):
478             self.network = neutron_utils.create_network(
479                     self.neutron, self.os_creds, self.network_settings)
480
481
482 class NeutronUtilsRouterTests(OSComponentTestCase):
483     """
484     Test for creating routers via neutron_utils.py
485     """
486
487     def setUp(self):
488         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
489         self.port_name = str(guid) + '-port'
490         self.neutron = neutron_utils.neutron_client(self.os_creds)
491         self.network = None
492         self.port = None
493         self.router = None
494         self.interface_router = None
495         self.net_config = openstack_tests.get_pub_net_config(
496             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
497             router_name=guid + '-pub-router', external_net=self.ext_net_name)
498
499     def tearDown(self):
500         """
501         Cleans the remote OpenStack objects
502         """
503         if self.interface_router:
504             neutron_utils.remove_interface_router(
505                 self.neutron, self.router, self.network.subnets[0])
506
507         if self.router:
508             try:
509                 neutron_utils.delete_router(self.neutron, self.router)
510                 validate_router(self.neutron, self.router.name, False)
511             except:
512                 pass
513
514         if self.port:
515             try:
516                 neutron_utils.delete_port(self.neutron, self.port)
517             except:
518                 pass
519
520         if self.network:
521             neutron_utils.delete_network(self.neutron, self.network)
522
523     def test_create_router_simple(self):
524         """
525         Tests the neutron_utils.create_router()
526         """
527         self.router = neutron_utils.create_router(
528             self.neutron, self.os_creds, self.net_config.router_settings)
529         validate_router(self.neutron, self.net_config.router_settings.name,
530                         True)
531
532     def test_create_router_with_public_interface(self):
533         """
534         Tests the neutron_utils.create_router() function with a pubic interface
535         """
536         subnet_setting = self.net_config.network_settings.subnet_settings[0]
537         self.net_config = openstack_tests.OSNetworkConfig(
538             self.net_config.network_settings.name,
539             subnet_setting.name,
540             subnet_setting.cidr,
541             self.net_config.router_settings.name,
542             self.ext_net_name)
543         self.router = neutron_utils.create_router(
544             self.neutron, self.os_creds, self.net_config.router_settings)
545         validate_router(self.neutron, self.net_config.router_settings.name,
546                         True)
547
548         ext_net = neutron_utils.get_network(
549             self.neutron, network_name=self.ext_net_name)
550         self.assertEqual(self.router.external_network_id, ext_net.id)
551
552     def test_add_interface_router(self):
553         """
554         Tests the neutron_utils.add_interface_router() function
555         """
556         self.network = neutron_utils.create_network(
557             self.neutron, self.os_creds, self.net_config.network_settings)
558         self.assertEqual(self.net_config.network_settings.name,
559                          self.network.name)
560         self.assertTrue(validate_network(
561             self.neutron, self.net_config.network_settings.name, True))
562
563         subnet_setting = self.net_config.network_settings.subnet_settings[0]
564         self.assertTrue(validate_subnet(
565             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
566
567         self.router = neutron_utils.create_router(
568             self.neutron, self.os_creds, self.net_config.router_settings)
569         validate_router(self.neutron, self.net_config.router_settings.name,
570                         True)
571
572         self.interface_router = neutron_utils.add_interface_router(
573             self.neutron, self.router, self.network.subnets[0])
574         validate_interface_router(self.interface_router, self.router,
575                                   self.network.subnets[0])
576
577     def test_add_interface_router_null_router(self):
578         """
579         Tests the neutron_utils.add_interface_router() function for an
580         Exception when the router value is None
581         """
582         self.network = neutron_utils.create_network(
583             self.neutron, self.os_creds, self.net_config.network_settings)
584         self.assertEqual(self.net_config.network_settings.name,
585                          self.network.name)
586         self.assertTrue(validate_network(
587             self.neutron, self.net_config.network_settings.name, True))
588
589         subnet_setting = self.net_config.network_settings.subnet_settings[0]
590         self.assertTrue(validate_subnet(
591             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
592
593         with self.assertRaises(NeutronException):
594             self.interface_router = neutron_utils.add_interface_router(
595                 self.neutron, self.router, self.network.subnets[0])
596
597     def test_add_interface_router_null_subnet(self):
598         """
599         Tests the neutron_utils.add_interface_router() function for an
600         Exception when the subnet value is None
601         """
602         self.network = neutron_utils.create_network(
603             self.neutron, self.os_creds, self.net_config.network_settings)
604         self.assertEqual(self.net_config.network_settings.name,
605                          self.network.name)
606         self.assertTrue(validate_network(
607             self.neutron, self.net_config.network_settings.name, True))
608
609         self.router = neutron_utils.create_router(
610             self.neutron, self.os_creds, self.net_config.router_settings)
611         validate_router(self.neutron, self.net_config.router_settings.name,
612                         True)
613
614         with self.assertRaises(NeutronException):
615             self.interface_router = neutron_utils.add_interface_router(
616                 self.neutron, self.router, None)
617
618     def test_add_interface_router_missing_subnet(self):
619         """
620         Tests the neutron_utils.add_interface_router() function for an
621         Exception when the subnet object has been deleted
622         """
623         self.network = neutron_utils.create_network(
624             self.neutron, self.os_creds, self.net_config.network_settings)
625         self.assertEqual(self.net_config.network_settings.name,
626                          self.network.name)
627         self.assertTrue(validate_network(
628             self.neutron, self.net_config.network_settings.name, True))
629
630         self.router = neutron_utils.create_router(
631             self.neutron, self.os_creds, self.net_config.router_settings)
632         validate_router(self.neutron, self.net_config.router_settings.name,
633                         True)
634
635         for subnet in self.network.subnets:
636             neutron_utils.delete_subnet(self.neutron, subnet)
637
638         with self.assertRaises(NotFound):
639             self.interface_router = neutron_utils.add_interface_router(
640                 self.neutron, self.router, self.network.subnets[0])
641
642     def test_create_port(self):
643         """
644         Tests the neutron_utils.create_port() function
645         """
646         self.network = neutron_utils.create_network(
647             self.neutron, self.os_creds, self.net_config.network_settings)
648         self.assertEqual(self.net_config.network_settings.name,
649                          self.network.name)
650         self.assertTrue(validate_network(
651             self.neutron, self.net_config.network_settings.name, True))
652
653         subnet_setting = self.net_config.network_settings.subnet_settings[0]
654         self.assertTrue(validate_subnet(
655             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
656
657         self.port = neutron_utils.create_port(
658             self.neutron, self.os_creds, PortConfig(
659                 name=self.port_name,
660                 ip_addrs=[{
661                     'subnet_name': subnet_setting.name,
662                     'ip': ip_1}],
663                 network_name=self.net_config.network_settings.name))
664         validate_port(self.neutron, self.port, self.port_name)
665
666     def test_create_port_empty_name(self):
667         """
668         Tests the neutron_utils.create_port() function
669         """
670         self.network = neutron_utils.create_network(
671             self.neutron, self.os_creds, self.net_config.network_settings)
672         self.assertEqual(self.net_config.network_settings.name,
673                          self.network.name)
674         self.assertTrue(validate_network(
675             self.neutron, self.net_config.network_settings.name, True))
676
677         subnet_setting = self.net_config.network_settings.subnet_settings[0]
678         self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
679                                         subnet_setting.cidr, True))
680
681         self.port = neutron_utils.create_port(
682             self.neutron, self.os_creds, PortConfig(
683                 name=self.port_name,
684                 network_name=self.net_config.network_settings.name,
685                 ip_addrs=[{
686                     'subnet_name': subnet_setting.name,
687                     'ip': ip_1}]))
688         validate_port(self.neutron, self.port, self.port_name)
689
690     def test_create_port_null_name(self):
691         """
692         Tests the neutron_utils.create_port() when the port name value is None
693         """
694         self.network = neutron_utils.create_network(
695             self.neutron, self.os_creds, self.net_config.network_settings)
696         self.assertEqual(self.net_config.network_settings.name,
697                          self.network.name)
698         self.assertTrue(validate_network(
699             self.neutron, self.net_config.network_settings.name, True))
700
701         subnet_setting = self.net_config.network_settings.subnet_settings[0]
702         self.assertTrue(validate_subnet(
703             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
704
705         self.port = neutron_utils.create_port(
706             self.neutron, self.os_creds,
707             PortConfig(
708                 network_name=self.net_config.network_settings.name,
709                 ip_addrs=[{
710                     'subnet_name': subnet_setting.name,
711                     'ip': ip_1}]))
712
713         port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
714         self.assertEqual(self.port, port)
715
716     def test_create_port_null_network_object(self):
717         """
718         Tests the neutron_utils.create_port() function for an Exception when
719         the network object is None
720         """
721         with self.assertRaises(Exception):
722             self.port = neutron_utils.create_port(
723                 self.neutron, self.os_creds,
724                 PortConfig(
725                     name=self.port_name,
726                     network_name=self.net_config.network_settings.name,
727                     ip_addrs=[{
728                         'subnet_name':
729                             self.net_config.network_settings.subnet_settings[
730                                 0].name,
731                         'ip': ip_1}]))
732
733     def test_create_port_null_ip(self):
734         """
735         Tests the neutron_utils.create_port() function for an Exception when
736         the IP value is None
737         """
738         self.network = neutron_utils.create_network(
739             self.neutron, self.os_creds, self.net_config.network_settings)
740         self.assertEqual(self.net_config.network_settings.name,
741                          self.network.name)
742         self.assertTrue(validate_network(
743             self.neutron, self.net_config.network_settings.name, True))
744
745         subnet_setting = self.net_config.network_settings.subnet_settings[0]
746         self.assertTrue(validate_subnet(
747             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
748
749         with self.assertRaises(Exception):
750             self.port = neutron_utils.create_port(
751                 self.neutron, self.os_creds,
752                 PortConfig(
753                     name=self.port_name,
754                     network_name=self.net_config.network_settings.name,
755                     ip_addrs=[{
756                         'subnet_name': subnet_setting.name,
757                         'ip': None}]))
758
759     def test_create_port_invalid_ip(self):
760         """
761         Tests the neutron_utils.create_port() function for an Exception when
762         the IP value is None
763         """
764         self.network = neutron_utils.create_network(
765             self.neutron, self.os_creds, self.net_config.network_settings)
766         self.assertEqual(self.net_config.network_settings.name,
767                          self.network.name)
768         self.assertTrue(validate_network(
769             self.neutron, self.net_config.network_settings.name, True))
770
771         subnet_setting = self.net_config.network_settings.subnet_settings[0]
772         self.assertTrue(validate_subnet(
773             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
774
775         with self.assertRaises(Exception):
776             self.port = neutron_utils.create_port(
777                 self.neutron, self.os_creds,
778                 PortConfig(
779                     name=self.port_name,
780                     network_name=self.net_config.network_settings.name,
781                     ip_addrs=[{
782                         'subnet_name': subnet_setting.name,
783                         'ip': 'foo'}]))
784
785     def test_create_port_invalid_ip_to_subnet(self):
786         """
787         Tests the neutron_utils.create_port() function for an Exception when
788         the IP value is None
789         """
790         self.network = neutron_utils.create_network(
791             self.neutron, self.os_creds, self.net_config.network_settings)
792         self.assertEqual(self.net_config.network_settings.name,
793                          self.network.name)
794         self.assertTrue(validate_network(
795             self.neutron, self.net_config.network_settings.name, True))
796
797         subnet_setting = self.net_config.network_settings.subnet_settings[0]
798         self.assertTrue(validate_subnet(
799             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
800
801         with self.assertRaises(Exception):
802             self.port = neutron_utils.create_port(
803                 self.neutron, self.os_creds,
804                 PortConfig(
805                     name=self.port_name,
806                     network_name=self.net_config.network_settings.name,
807                     ip_addrs=[{
808                         'subnet_name': subnet_setting.name,
809                         'ip': '10.197.123.100'}]))
810
811
812 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
813     """
814     Test for creating security groups via neutron_utils.py
815     """
816
817     def setUp(self):
818         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
819         self.sec_grp_name = guid + 'name'
820
821         self.security_groups = list()
822         self.security_group_rules = list()
823         self.neutron = neutron_utils.neutron_client(self.os_creds)
824         self.keystone = keystone_utils.keystone_client(self.os_creds)
825
826     def tearDown(self):
827         """
828         Cleans the remote OpenStack objects
829         """
830         for rule in self.security_group_rules:
831             neutron_utils.delete_security_group_rule(self.neutron, rule)
832
833         for security_group in self.security_groups:
834             try:
835                 neutron_utils.delete_security_group(self.neutron,
836                                                     security_group)
837             except:
838                 pass
839
840     def test_create_delete_simple_sec_grp(self):
841         """
842         Tests the neutron_utils.create_security_group() function
843         """
844         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
845         security_group = neutron_utils.create_security_group(self.neutron,
846                                                              self.keystone,
847                                                              sec_grp_settings)
848
849         self.assertTrue(sec_grp_settings.name, security_group.name)
850
851         sec_grp_get = neutron_utils.get_security_group(
852             self.neutron, sec_grp_settings=sec_grp_settings)
853         self.assertIsNotNone(sec_grp_get)
854         self.assertTrue(validation_utils.objects_equivalent(
855             security_group, sec_grp_get))
856
857         neutron_utils.delete_security_group(self.neutron, security_group)
858         sec_grp_get = neutron_utils.get_security_group(
859             self.neutron, sec_grp_settings=sec_grp_settings)
860         self.assertIsNone(sec_grp_get)
861
862     def test_create_sec_grp_no_name(self):
863         """
864         Tests the SecurityGroupSettings constructor and
865         neutron_utils.create_security_group() function to ensure that
866         attempting to create a security group without a name will raise an
867         exception
868         """
869         with self.assertRaises(Exception):
870             sec_grp_settings = SecurityGroupSettings()
871             self.security_groups.append(
872                 neutron_utils.create_security_group(self.neutron,
873                                                     self.keystone,
874                                                     sec_grp_settings))
875
876     def test_create_sec_grp_no_rules(self):
877         """
878         Tests the neutron_utils.create_security_group() function
879         """
880         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
881                                                  description='hello group')
882         self.security_groups.append(
883             neutron_utils.create_security_group(self.neutron, self.keystone,
884                                                 sec_grp_settings))
885
886         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
887         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
888
889         sec_grp_get = neutron_utils.get_security_group(
890             self.neutron, sec_grp_settings=sec_grp_settings)
891         self.assertIsNotNone(sec_grp_get)
892         self.assertEqual(self.security_groups[0], sec_grp_get)
893
894     def test_create_sec_grp_one_rule(self):
895         """
896         Tests the neutron_utils.create_security_group() function
897         """
898
899         sec_grp_rule_settings = SecurityGroupRuleSettings(
900             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
901         sec_grp_settings = SecurityGroupSettings(
902             name=self.sec_grp_name, description='hello group',
903             rule_settings=[sec_grp_rule_settings])
904
905         self.security_groups.append(
906             neutron_utils.create_security_group(self.neutron, self.keystone,
907                                                 sec_grp_settings))
908         free_rules = neutron_utils.get_rules_by_security_group(
909             self.neutron, self.security_groups[0])
910         for free_rule in free_rules:
911             self.security_group_rules.append(free_rule)
912
913         self.security_group_rules.append(
914             neutron_utils.create_security_group_rule(
915                 self.neutron, sec_grp_settings.rule_settings[0]))
916
917         # Refresh object so it is populated with the newly added rule
918         security_group = neutron_utils.get_security_group(
919             self.neutron, sec_grp_settings=sec_grp_settings)
920
921         rules = neutron_utils.get_rules_by_security_group(self.neutron,
922                                                           security_group)
923
924         self.assertTrue(
925             validation_utils.objects_equivalent(
926                  self.security_group_rules, rules))
927
928         self.assertTrue(sec_grp_settings.name, security_group.name)
929
930         sec_grp_get = neutron_utils.get_security_group(
931             self.neutron, sec_grp_settings=sec_grp_settings)
932         self.assertIsNotNone(sec_grp_get)
933         self.assertEqual(security_group, sec_grp_get)
934
935     def test_get_sec_grp_by_id(self):
936         """
937         Tests the neutron_utils.create_security_group() function
938         """
939
940         self.security_groups.append(neutron_utils.create_security_group(
941             self.neutron, self.keystone,
942             SecurityGroupSettings(name=self.sec_grp_name + '-1',
943                                   description='hello group')))
944         self.security_groups.append(neutron_utils.create_security_group(
945             self.neutron, self.keystone,
946             SecurityGroupSettings(name=self.sec_grp_name + '-2',
947                                   description='hello group')))
948
949         sec_grp_1b = neutron_utils.get_security_group_by_id(
950             self.neutron, self.security_groups[0].id)
951         sec_grp_2b = neutron_utils.get_security_group_by_id(
952             self.neutron, self.security_groups[1].id)
953
954         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
955         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
956
957
958 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
959     """
960     Test basic nova keypair functionality
961     """
962
963     def setUp(self):
964         """
965         Instantiates the CreateImage object that is responsible for downloading
966         and creating an OS image file within OpenStack
967         """
968         self.neutron = neutron_utils.neutron_client(self.os_creds)
969         self.floating_ip = None
970
971     def tearDown(self):
972         """
973         Cleans the image and downloaded image file
974         """
975         if self.floating_ip:
976             try:
977                 neutron_utils.delete_floating_ip(
978                     self.neutron, self.floating_ip)
979             except:
980                 pass
981
982     def test_floating_ips(self):
983         """
984         Tests the creation of a floating IP
985         :return:
986         """
987         initial_fips = neutron_utils.get_floating_ips(self.neutron)
988
989         self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
990                                                             self.ext_net_name)
991         all_fips = neutron_utils.get_floating_ips(self.neutron)
992         self.assertEqual(len(initial_fips) + 1, len(all_fips))
993         returned = neutron_utils.get_floating_ip(self.neutron,
994                                                  self.floating_ip)
995         self.assertEqual(self.floating_ip.id, returned.id)
996         self.assertEqual(self.floating_ip.ip, returned.ip)
997
998
999 """
1000 Validation routines
1001 """
1002
1003
1004 def validate_network(neutron, name, exists):
1005     """
1006     Returns true if a network for a given name DOES NOT exist if the exists
1007     parameter is false conversely true. Returns false if a network for a given
1008     name DOES exist if the exists parameter is true conversely false.
1009     :param neutron: The neutron client
1010     :param name: The expected network name
1011     :param exists: Whether or not the network name should exist or not
1012     :return: True/False
1013     """
1014     network = neutron_utils.get_network(neutron, network_name=name)
1015     if exists and network:
1016         return True
1017     if not exists and not network:
1018         return True
1019     return False
1020
1021
1022 def validate_subnet(neutron, name, cidr, exists):
1023     """
1024     Returns true if a subnet for a given name DOES NOT exist if the exists
1025     parameter is false conversely true. Returns false if a subnet for a given
1026     name DOES exist if the exists parameter is true conversely false.
1027     :param neutron: The neutron client
1028     :param name: The expected subnet name
1029     :param cidr: The expected CIDR value
1030     :param exists: Whether or not the network name should exist or not
1031     :return: True/False
1032     """
1033     subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
1034     if exists and subnet and subnet.name == name:
1035         return subnet.cidr == cidr
1036     if not exists and not subnet:
1037         return True
1038     return False
1039
1040
1041 def validate_router(neutron, name, exists):
1042     """
1043     Returns true if a router for a given name DOES NOT exist if the exists
1044     parameter is false conversely true. Returns false if a router for a given
1045     name DOES exist if the exists parameter is true conversely false.
1046     :param neutron: The neutron client
1047     :param name: The expected router name
1048     :param exists: Whether or not the network name should exist or not
1049     :return: True/False
1050     """
1051     router = neutron_utils.get_router(neutron, router_name=name)
1052     if exists and router:
1053         return True
1054     return False
1055
1056
1057 def validate_interface_router(interface_router, router, subnet):
1058     """
1059     Returns true if the router ID & subnet ID have been properly included into
1060     the interface router object
1061     :param interface_router: the SNAPS-OO InterfaceRouter domain object
1062     :param router: to validate against the interface_router
1063     :param subnet: to validate against the interface_router
1064     :return: True if both IDs match else False
1065     """
1066     subnet_id = interface_router.subnet_id
1067     router_id = interface_router.port_id
1068
1069     return subnet.id == subnet_id and router.id == router_id
1070
1071
1072 def validate_port(neutron, port_obj, this_port_name):
1073     """
1074     Returns true if a port for a given name DOES NOT exist if the exists
1075     parameter is false conversely true. Returns false if a port for a given
1076     name DOES exist if the exists parameter is true conversely false.
1077     :param neutron: The neutron client
1078     :param port_obj: The port object to lookup
1079     :param this_port_name: The expected router name
1080     :return: True/False
1081     """
1082     os_ports = neutron.list_ports()
1083     for os_port, os_port_insts in os_ports.items():
1084         for os_inst in os_port_insts:
1085             if os_inst['id'] == port_obj.id:
1086                 return os_inst['name'] == this_port_name
1087     return False