Merge "Apply upper-constraints when testing SNAPS"
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from neutronclient.common.exceptions import NotFound, BadRequest
18
19 from snaps.openstack import create_router
20 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
21     PortSettings
22 from snaps.openstack.create_security_group import SecurityGroupSettings, \
23     SecurityGroupRuleSettings, Direction
24 from snaps.openstack.tests import openstack_tests
25 from snaps.openstack.tests import validation_utils
26 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
27 from snaps.openstack.utils import keystone_utils
28 from snaps.openstack.utils import neutron_utils
29 from snaps.openstack.utils.neutron_utils import NeutronException
30
31 __author__ = 'spisarski'
32
33 ip_1 = '10.55.1.100'
34 ip_2 = '10.55.1.200'
35
36
37 class NeutronSmokeTests(OSComponentTestCase):
38     """
39     Tests to ensure that the neutron client can communicate with the cloud
40     """
41
42     def test_neutron_connect_success(self):
43         """
44         Tests to ensure that the proper credentials can connect.
45         """
46         neutron = neutron_utils.neutron_client(self.os_creds)
47
48         networks = neutron.list_networks()
49
50         found = False
51         networks = networks.get('networks')
52         for network in networks:
53             if network.get('name') == self.ext_net_name:
54                 found = True
55         self.assertTrue(found)
56
57     def test_neutron_connect_fail(self):
58         """
59         Tests to ensure that the improper credentials cannot connect.
60         """
61         from snaps.openstack.os_credentials import OSCreds
62
63         with self.assertRaises(Exception):
64             neutron = neutron_utils.neutron_client(
65                 OSCreds(username='user', password='pass', auth_url='url',
66                         project_name='project'))
67             neutron.list_networks()
68
69     def test_retrieve_ext_network_name(self):
70         """
71         Tests the neutron_utils.get_external_network_names to ensure the
72         configured self.ext_net_name is contained within the returned list
73         :return:
74         """
75         neutron = neutron_utils.neutron_client(self.os_creds)
76         ext_networks = neutron_utils.get_external_networks(neutron)
77         found = False
78         for network in ext_networks:
79             if network.name == self.ext_net_name:
80                 found = True
81                 break
82         self.assertTrue(found)
83
84
85 class NeutronUtilsNetworkTests(OSComponentTestCase):
86     """
87     Test for creating networks via neutron_utils.py
88     """
89
90     def setUp(self):
91         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
92         self.port_name = str(guid) + '-port'
93         self.neutron = neutron_utils.neutron_client(self.os_creds)
94         self.network = None
95         self.net_config = openstack_tests.get_pub_net_config(
96             net_name=guid + '-pub-net')
97
98     def tearDown(self):
99         """
100         Cleans the remote OpenStack objects
101         """
102         if self.network:
103             neutron_utils.delete_network(self.neutron, self.network)
104
105     def test_create_network(self):
106         """
107         Tests the neutron_utils.create_network() function
108         """
109         self.network = neutron_utils.create_network(
110             self.neutron, self.os_creds, self.net_config.network_settings)
111         self.assertEqual(self.net_config.network_settings.name,
112                          self.network.name)
113         self.assertTrue(validate_network(
114             self.neutron, self.net_config.network_settings.name, True))
115         self.assertEqual(len(self.net_config.network_settings.subnet_settings),
116                          len(self.network.subnets))
117
118     def test_create_network_empty_name(self):
119         """
120         Tests the neutron_utils.create_network() function with an empty
121         network name
122         """
123         with self.assertRaises(Exception):
124             self.network = neutron_utils.create_network(
125                 self.neutron, self.os_creds,
126                 network_settings=NetworkSettings(name=''))
127
128     def test_create_network_null_name(self):
129         """
130         Tests the neutron_utils.create_network() function when the network
131         name is None
132         """
133         with self.assertRaises(Exception):
134             self.network = neutron_utils.create_network(
135                 self.neutron, self.os_creds,
136                 network_settings=NetworkSettings())
137
138
139 class NeutronUtilsSubnetTests(OSComponentTestCase):
140     """
141     Test for creating networks with subnets via neutron_utils.py
142     """
143
144     def setUp(self):
145         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
146         self.port_name = str(guid) + '-port'
147         self.neutron = neutron_utils.neutron_client(self.os_creds)
148         self.network = None
149         self.net_config = openstack_tests.get_pub_net_config(
150             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
151             external_net=self.ext_net_name)
152
153     def tearDown(self):
154         """
155         Cleans the remote OpenStack objects
156         """
157         if self.network:
158             try:
159                 neutron_utils.delete_network(self.neutron, self.network)
160             except:
161                 pass
162
163     def test_create_subnet(self):
164         """
165         Tests the neutron_utils.create_network() function
166         """
167         self.network = neutron_utils.create_network(
168             self.neutron, self.os_creds, self.net_config.network_settings)
169         self.assertEqual(self.net_config.network_settings.name,
170                          self.network.name)
171         self.assertTrue(validate_network(
172             self.neutron, self.net_config.network_settings.name, True))
173
174         subnet_setting = self.net_config.network_settings.subnet_settings[0]
175         self.assertTrue(validate_subnet(
176             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
177
178         subnet_query1 = neutron_utils.get_subnet(
179             self.neutron, subnet_name=subnet_setting.name)
180         self.assertEqual(self.network.subnets[0], subnet_query1)
181
182         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
183                                                              self.network)
184         self.assertIsNotNone(subnet_query2)
185         self.assertEqual(1, len(subnet_query2))
186         self.assertEqual(self.network.subnets[0], subnet_query2[0])
187
188     def test_create_subnet_null_name(self):
189         """
190         Tests the neutron_utils.create_neutron_subnet() function for an
191         Exception when the subnet name is None
192         """
193         self.network = neutron_utils.create_network(
194             self.neutron, self.os_creds, self.net_config.network_settings)
195         self.assertEqual(self.net_config.network_settings.name,
196                          self.network.name)
197         self.assertTrue(validate_network(
198             self.neutron, self.net_config.network_settings.name, True))
199
200         with self.assertRaises(Exception):
201             SubnetSettings(cidr=self.net_config.subnet_cidr)
202
203     def test_create_subnet_empty_name(self):
204         """
205         Tests the neutron_utils.create_network() function with an empty
206         name
207         """
208         self.network = neutron_utils.create_network(
209             self.neutron, self.os_creds, self.net_config.network_settings)
210         self.assertEqual(self.net_config.network_settings.name,
211                          self.network.name)
212         self.assertTrue(validate_network(
213             self.neutron, self.net_config.network_settings.name, True))
214
215         subnet_setting = self.net_config.network_settings.subnet_settings[0]
216         self.assertTrue(validate_subnet(
217             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
218         self.assertFalse(validate_subnet(
219             self.neutron, '', subnet_setting.cidr, True))
220
221         subnet_query1 = neutron_utils.get_subnet(
222             self.neutron, subnet_name=subnet_setting.name)
223         self.assertEqual(self.network.subnets[0], subnet_query1)
224
225         subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
226                                                              self.network)
227         self.assertIsNotNone(subnet_query2)
228         self.assertEqual(1, len(subnet_query2))
229         self.assertEqual(self.network.subnets[0], subnet_query2[0])
230
231     def test_create_subnet_null_cidr(self):
232         """
233         Tests the neutron_utils.create_neutron_subnet() function for an
234         Exception when the subnet CIDR value is None
235         """
236         self.net_config.network_settings.subnet_settings[0].cidr = None
237         with self.assertRaises(Exception):
238             self.network = neutron_utils.create_network(
239                 self.neutron, self.os_creds, self.net_config.network_settings)
240
241     def test_create_subnet_empty_cidr(self):
242         """
243         Tests the neutron_utils.create_neutron_subnet() function for an
244         Exception when the subnet CIDR value is empty
245         """
246         self.net_config.network_settings.subnet_settings[0].cidr = ''
247         with self.assertRaises(Exception):
248             self.network = neutron_utils.create_network(
249                 self.neutron, self.os_creds, self.net_config.network_settings)
250
251
252 class NeutronUtilsIPv6Tests(OSComponentTestCase):
253     """
254     Test for creating IPv6 networks with subnets via neutron_utils.py
255     """
256
257     def setUp(self):
258         self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
259         self.neutron = neutron_utils.neutron_client(self.os_creds)
260         self.network = None
261
262     def tearDown(self):
263         """
264         Cleans the remote OpenStack objects
265         """
266         if self.network:
267             try:
268                 neutron_utils.delete_network(self.neutron, self.network)
269             except:
270                 pass
271
272     def test_create_network_slaac(self):
273         """
274         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
275         is True and IPv6 modes are slaac
276         """
277         sub_setting = SubnetSettings(
278             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
279             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
280             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
281             enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
282         self.network_settings = NetworkSettings(
283             name=self.guid + '-net', subnet_settings=[sub_setting])
284
285         self.network = neutron_utils.create_network(
286             self.neutron, self.os_creds, self.network_settings)
287         self.assertEqual(self.network_settings.name, self.network.name)
288
289         subnet_settings = self.network_settings.subnet_settings[0]
290         self.assertEqual(1, len(self.network.subnets))
291         subnet = self.network.subnets[0]
292
293         self.assertEqual(self.network.id, subnet.network_id)
294         self.assertEqual(subnet_settings.name, subnet.name)
295         self.assertEqual(subnet_settings.start, subnet.start)
296         self.assertEqual(subnet_settings.end, subnet.end)
297         self.assertEqual('1:1::/64', subnet.cidr)
298         self.assertEqual(6, subnet.ip_version)
299         self.assertEqual(1, len(subnet.dns_nameservers))
300         self.assertEqual(
301             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
302         self.assertTrue(subnet.enable_dhcp)
303         self.assertEqual(
304             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
305         self.assertEqual(
306             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
307
308     def test_create_network_stateful(self):
309         """
310         Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
311         is True and IPv6 modes are stateful
312         """
313         sub_setting = SubnetSettings(
314             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
315             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
316             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
317             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
318             ipv6_address_mode='dhcpv6-stateful')
319         self.network_settings = NetworkSettings(
320             name=self.guid + '-net', subnet_settings=[sub_setting])
321
322         self.network = neutron_utils.create_network(
323             self.neutron, self.os_creds, self.network_settings)
324
325         self.assertEqual(self.network_settings.name, self.network.name)
326
327         subnet_settings = self.network_settings.subnet_settings[0]
328         self.assertEqual(1, len(self.network.subnets))
329         subnet = self.network.subnets[0]
330
331         self.assertEqual(self.network.id, subnet.network_id)
332         self.assertEqual(subnet_settings.name, subnet.name)
333         self.assertEqual(subnet_settings.start, subnet.start)
334         self.assertEqual(subnet_settings.end, subnet.end)
335         self.assertEqual('1:1::/64', subnet.cidr)
336         self.assertEqual(6, subnet.ip_version)
337         self.assertEqual(1, len(subnet.dns_nameservers))
338         self.assertEqual(
339             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
340         self.assertTrue(subnet.enable_dhcp)
341         self.assertEqual(
342             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
343         self.assertEqual(
344             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
345
346     def test_create_network_stateless(self):
347         """
348         Tests the neutron_utils.create_network() when DHCP is enabled and
349         the RA and address modes are both 'slaac'
350         """
351         sub_setting = SubnetSettings(
352             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
353             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
354             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
355             enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
356             ipv6_address_mode='dhcpv6-stateless')
357         self.network_settings = NetworkSettings(
358             name=self.guid + '-net', subnet_settings=[sub_setting])
359
360         self.network = neutron_utils.create_network(
361             self.neutron, self.os_creds, self.network_settings)
362
363         self.assertEqual(self.network_settings.name, self.network.name)
364
365         subnet_settings = self.network_settings.subnet_settings[0]
366         self.assertEqual(1, len(self.network.subnets))
367         subnet = self.network.subnets[0]
368
369         self.assertEqual(self.network.id, subnet.network_id)
370         self.assertEqual(subnet_settings.name, subnet.name)
371         self.assertEqual(subnet_settings.start, subnet.start)
372         self.assertEqual(subnet_settings.end, subnet.end)
373         self.assertEqual('1:1::/64', subnet.cidr)
374         self.assertEqual(6, subnet.ip_version)
375         self.assertEqual(1, len(subnet.dns_nameservers))
376         self.assertEqual(
377             sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
378         self.assertTrue(subnet.enable_dhcp)
379         self.assertEqual(
380             subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
381         self.assertEqual(
382             subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
383
384     def test_create_network_no_dhcp_slaac(self):
385         """
386         Tests the neutron_utils.create_network() for a BadRequest when
387         DHCP is not enabled and the RA and address modes are both 'slaac'
388         """
389         sub_setting = SubnetSettings(
390             name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
391             ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
392             gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
393             enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
394         self.network_settings = NetworkSettings(
395             name=self.guid + '-net', subnet_settings=[sub_setting])
396
397         with self.assertRaises(BadRequest):
398             self.network = neutron_utils.create_network(
399                 self.neutron, self.os_creds, self.network_settings)
400
401     def test_create_network_invalid_start_ip(self):
402         """
403         Tests the neutron_utils.create_network() that contains one IPv6 subnet
404         with an invalid start IP to ensure Neutron assigns it the smallest IP
405         possible
406         """
407         sub_setting = SubnetSettings(
408             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
409             start='foo')
410         self.network_settings = NetworkSettings(
411             name=self.guid + '-net', subnet_settings=[sub_setting])
412
413         self.network = neutron_utils.create_network(
414             self.neutron, self.os_creds, self.network_settings)
415
416         self.assertEqual('1:1::2', self.network.subnets[0].start)
417         self.assertEqual(
418             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
419
420     def test_create_network_invalid_end_ip(self):
421         """
422         Tests the neutron_utils.create_network() that contains one IPv6 subnet
423         with an invalid end IP to ensure Neutron assigns it the largest IP
424         possible
425         """
426         sub_setting = SubnetSettings(
427             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
428             end='bar')
429         self.network_settings = NetworkSettings(
430             name=self.guid + '-net', subnet_settings=[sub_setting])
431
432         self.network = neutron_utils.create_network(
433             self.neutron, self.os_creds, self.network_settings)
434
435         self.assertEqual('1:1::2', self.network.subnets[0].start)
436         self.assertEqual(
437             '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
438
439     def test_create_network_with_bad_cidr(self):
440         """
441         Tests the neutron_utils.create_network() for a BadRequest when
442         the subnet CIDR is invalid
443         """
444         sub_setting = SubnetSettings(
445             name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
446         self.network_settings = NetworkSettings(
447             name=self.guid + '-net', subnet_settings=[sub_setting])
448
449         with self.assertRaises(BadRequest):
450             self.network = neutron_utils.create_network(
451                 self.neutron, self.os_creds, self.network_settings)
452
453     def test_create_network_invalid_gateway_ip(self):
454         """
455         Tests the neutron_utils.create_network() for a BadRequest when
456         the subnet gateway IP is invalid
457         """
458         sub_setting = SubnetSettings(
459             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
460             gateway_ip='1:2::1')
461         self.network_settings = NetworkSettings(
462             name=self.guid + '-net', subnet_settings=[sub_setting])
463
464         with self.assertRaises(BadRequest):
465             self.network = neutron_utils.create_network(
466                 self.neutron, self.os_creds, self.network_settings)
467
468     def test_create_network_with_bad_dns(self):
469         """
470         Tests the neutron_utils.create_network() for a BadRequest when
471         the DNS IP is invalid
472         """
473         sub_setting = SubnetSettings(
474             name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
475             dns_nameservers=['foo'])
476         self.network_settings = NetworkSettings(
477             name=self.guid + '-net', subnet_settings=[sub_setting])
478
479         with self.assertRaises(BadRequest):
480             self.network = neutron_utils.create_network(
481                     self.neutron, self.os_creds, self.network_settings)
482
483
484 class NeutronUtilsRouterTests(OSComponentTestCase):
485     """
486     Test for creating routers via neutron_utils.py
487     """
488
489     def setUp(self):
490         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
491         self.port_name = str(guid) + '-port'
492         self.neutron = neutron_utils.neutron_client(self.os_creds)
493         self.network = None
494         self.port = None
495         self.router = None
496         self.interface_router = None
497         self.net_config = openstack_tests.get_pub_net_config(
498             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
499             router_name=guid + '-pub-router', external_net=self.ext_net_name)
500
501     def tearDown(self):
502         """
503         Cleans the remote OpenStack objects
504         """
505         if self.interface_router:
506             neutron_utils.remove_interface_router(
507                 self.neutron, self.router, self.network.subnets[0])
508
509         if self.router:
510             try:
511                 neutron_utils.delete_router(self.neutron, self.router)
512                 validate_router(self.neutron, self.router.name, False)
513             except:
514                 pass
515
516         if self.port:
517             try:
518                 neutron_utils.delete_port(self.neutron, self.port)
519             except:
520                 pass
521
522         if self.network:
523             neutron_utils.delete_network(self.neutron, self.network)
524
525     def test_create_router_simple(self):
526         """
527         Tests the neutron_utils.create_router()
528         """
529         self.router = neutron_utils.create_router(
530             self.neutron, self.os_creds, self.net_config.router_settings)
531         validate_router(self.neutron, self.net_config.router_settings.name,
532                         True)
533
534     def test_create_router_with_public_interface(self):
535         """
536         Tests the neutron_utils.create_router() function with a pubic interface
537         """
538         subnet_setting = self.net_config.network_settings.subnet_settings[0]
539         self.net_config = openstack_tests.OSNetworkConfig(
540             self.net_config.network_settings.name,
541             subnet_setting.name,
542             subnet_setting.cidr,
543             self.net_config.router_settings.name,
544             self.ext_net_name)
545         self.router = neutron_utils.create_router(
546             self.neutron, self.os_creds, self.net_config.router_settings)
547         validate_router(self.neutron, self.net_config.router_settings.name,
548                         True)
549
550         ext_net = neutron_utils.get_network(
551             self.neutron, network_name=self.ext_net_name)
552         self.assertEqual(self.router.external_network_id, ext_net.id)
553
554     def test_add_interface_router(self):
555         """
556         Tests the neutron_utils.add_interface_router() function
557         """
558         self.network = neutron_utils.create_network(
559             self.neutron, self.os_creds, self.net_config.network_settings)
560         self.assertEqual(self.net_config.network_settings.name,
561                          self.network.name)
562         self.assertTrue(validate_network(
563             self.neutron, self.net_config.network_settings.name, True))
564
565         subnet_setting = self.net_config.network_settings.subnet_settings[0]
566         self.assertTrue(validate_subnet(
567             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
568
569         self.router = neutron_utils.create_router(
570             self.neutron, self.os_creds, self.net_config.router_settings)
571         validate_router(self.neutron, self.net_config.router_settings.name,
572                         True)
573
574         self.interface_router = neutron_utils.add_interface_router(
575             self.neutron, self.router, self.network.subnets[0])
576         validate_interface_router(self.interface_router, self.router,
577                                   self.network.subnets[0])
578
579     def test_add_interface_router_null_router(self):
580         """
581         Tests the neutron_utils.add_interface_router() function for an
582         Exception when the router value is None
583         """
584         self.network = neutron_utils.create_network(
585             self.neutron, self.os_creds, self.net_config.network_settings)
586         self.assertEqual(self.net_config.network_settings.name,
587                          self.network.name)
588         self.assertTrue(validate_network(
589             self.neutron, self.net_config.network_settings.name, True))
590
591         subnet_setting = self.net_config.network_settings.subnet_settings[0]
592         self.assertTrue(validate_subnet(
593             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
594
595         with self.assertRaises(NeutronException):
596             self.interface_router = neutron_utils.add_interface_router(
597                 self.neutron, self.router, self.network.subnets[0])
598
599     def test_add_interface_router_null_subnet(self):
600         """
601         Tests the neutron_utils.add_interface_router() function for an
602         Exception when the subnet value is None
603         """
604         self.network = neutron_utils.create_network(
605             self.neutron, self.os_creds, self.net_config.network_settings)
606         self.assertEqual(self.net_config.network_settings.name,
607                          self.network.name)
608         self.assertTrue(validate_network(
609             self.neutron, self.net_config.network_settings.name, True))
610
611         self.router = neutron_utils.create_router(
612             self.neutron, self.os_creds, self.net_config.router_settings)
613         validate_router(self.neutron, self.net_config.router_settings.name,
614                         True)
615
616         with self.assertRaises(NeutronException):
617             self.interface_router = neutron_utils.add_interface_router(
618                 self.neutron, self.router, None)
619
620     def test_add_interface_router_missing_subnet(self):
621         """
622         Tests the neutron_utils.add_interface_router() function for an
623         Exception when the subnet object has been deleted
624         """
625         self.network = neutron_utils.create_network(
626             self.neutron, self.os_creds, self.net_config.network_settings)
627         self.assertEqual(self.net_config.network_settings.name,
628                          self.network.name)
629         self.assertTrue(validate_network(
630             self.neutron, self.net_config.network_settings.name, True))
631
632         self.router = neutron_utils.create_router(
633             self.neutron, self.os_creds, self.net_config.router_settings)
634         validate_router(self.neutron, self.net_config.router_settings.name,
635                         True)
636
637         for subnet in self.network.subnets:
638             neutron_utils.delete_subnet(self.neutron, subnet)
639
640         with self.assertRaises(NotFound):
641             self.interface_router = neutron_utils.add_interface_router(
642                 self.neutron, self.router, self.network.subnets[0])
643
644     def test_create_port(self):
645         """
646         Tests the neutron_utils.create_port() function
647         """
648         self.network = neutron_utils.create_network(
649             self.neutron, self.os_creds, self.net_config.network_settings)
650         self.assertEqual(self.net_config.network_settings.name,
651                          self.network.name)
652         self.assertTrue(validate_network(
653             self.neutron, self.net_config.network_settings.name, True))
654
655         subnet_setting = self.net_config.network_settings.subnet_settings[0]
656         self.assertTrue(validate_subnet(
657             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
658
659         self.port = neutron_utils.create_port(
660             self.neutron, self.os_creds, PortSettings(
661                 name=self.port_name,
662                 ip_addrs=[{
663                     'subnet_name': subnet_setting.name,
664                     'ip': ip_1}],
665                 network_name=self.net_config.network_settings.name))
666         validate_port(self.neutron, self.port, self.port_name)
667
668     def test_create_port_empty_name(self):
669         """
670         Tests the neutron_utils.create_port() function
671         """
672         self.network = neutron_utils.create_network(
673             self.neutron, self.os_creds, self.net_config.network_settings)
674         self.assertEqual(self.net_config.network_settings.name,
675                          self.network.name)
676         self.assertTrue(validate_network(
677             self.neutron, self.net_config.network_settings.name, True))
678
679         subnet_setting = self.net_config.network_settings.subnet_settings[0]
680         self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
681                                         subnet_setting.cidr, True))
682
683         self.port = neutron_utils.create_port(
684             self.neutron, self.os_creds, PortSettings(
685                 name=self.port_name,
686                 network_name=self.net_config.network_settings.name,
687                 ip_addrs=[{
688                     'subnet_name': subnet_setting.name,
689                     'ip': ip_1}]))
690         validate_port(self.neutron, self.port, self.port_name)
691
692     def test_create_port_null_name(self):
693         """
694         Tests the neutron_utils.create_port() when the port name value is None
695         """
696         self.network = neutron_utils.create_network(
697             self.neutron, self.os_creds, self.net_config.network_settings)
698         self.assertEqual(self.net_config.network_settings.name,
699                          self.network.name)
700         self.assertTrue(validate_network(
701             self.neutron, self.net_config.network_settings.name, True))
702
703         subnet_setting = self.net_config.network_settings.subnet_settings[0]
704         self.assertTrue(validate_subnet(
705             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
706
707         self.port = neutron_utils.create_port(
708             self.neutron, self.os_creds,
709             PortSettings(
710                 network_name=self.net_config.network_settings.name,
711                 ip_addrs=[{
712                     'subnet_name': subnet_setting.name,
713                     'ip': ip_1}]))
714
715         port = neutron_utils.get_port_by_id(self.neutron, self.port.id)
716         self.assertEqual(self.port, port)
717
718     def test_create_port_null_network_object(self):
719         """
720         Tests the neutron_utils.create_port() function for an Exception when
721         the network object is None
722         """
723         with self.assertRaises(Exception):
724             self.port = neutron_utils.create_port(
725                 self.neutron, self.os_creds,
726                 PortSettings(
727                     name=self.port_name,
728                     network_name=self.net_config.network_settings.name,
729                     ip_addrs=[{
730                         'subnet_name':
731                             self.net_config.network_settings.subnet_settings[
732                                 0].name,
733                         'ip': ip_1}]))
734
735     def test_create_port_null_ip(self):
736         """
737         Tests the neutron_utils.create_port() function for an Exception when
738         the IP value is None
739         """
740         self.network = neutron_utils.create_network(
741             self.neutron, self.os_creds, self.net_config.network_settings)
742         self.assertEqual(self.net_config.network_settings.name,
743                          self.network.name)
744         self.assertTrue(validate_network(
745             self.neutron, self.net_config.network_settings.name, True))
746
747         subnet_setting = self.net_config.network_settings.subnet_settings[0]
748         self.assertTrue(validate_subnet(
749             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
750
751         with self.assertRaises(Exception):
752             self.port = neutron_utils.create_port(
753                 self.neutron, self.os_creds,
754                 PortSettings(
755                     name=self.port_name,
756                     network_name=self.net_config.network_settings.name,
757                     ip_addrs=[{
758                         'subnet_name': subnet_setting.name,
759                         'ip': None}]))
760
761     def test_create_port_invalid_ip(self):
762         """
763         Tests the neutron_utils.create_port() function for an Exception when
764         the IP value is None
765         """
766         self.network = neutron_utils.create_network(
767             self.neutron, self.os_creds, self.net_config.network_settings)
768         self.assertEqual(self.net_config.network_settings.name,
769                          self.network.name)
770         self.assertTrue(validate_network(
771             self.neutron, self.net_config.network_settings.name, True))
772
773         subnet_setting = self.net_config.network_settings.subnet_settings[0]
774         self.assertTrue(validate_subnet(
775             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
776
777         with self.assertRaises(Exception):
778             self.port = neutron_utils.create_port(
779                 self.neutron, self.os_creds,
780                 PortSettings(
781                     name=self.port_name,
782                     network_name=self.net_config.network_settings.name,
783                     ip_addrs=[{
784                         'subnet_name': subnet_setting.name,
785                         'ip': 'foo'}]))
786
787     def test_create_port_invalid_ip_to_subnet(self):
788         """
789         Tests the neutron_utils.create_port() function for an Exception when
790         the IP value is None
791         """
792         self.network = neutron_utils.create_network(
793             self.neutron, self.os_creds, self.net_config.network_settings)
794         self.assertEqual(self.net_config.network_settings.name,
795                          self.network.name)
796         self.assertTrue(validate_network(
797             self.neutron, self.net_config.network_settings.name, True))
798
799         subnet_setting = self.net_config.network_settings.subnet_settings[0]
800         self.assertTrue(validate_subnet(
801             self.neutron, subnet_setting.name, subnet_setting.cidr, True))
802
803         with self.assertRaises(Exception):
804             self.port = neutron_utils.create_port(
805                 self.neutron, self.os_creds,
806                 PortSettings(
807                     name=self.port_name,
808                     network_name=self.net_config.network_settings.name,
809                     ip_addrs=[{
810                         'subnet_name': subnet_setting.name,
811                         'ip': '10.197.123.100'}]))
812
813
814 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
815     """
816     Test for creating security groups via neutron_utils.py
817     """
818
819     def setUp(self):
820         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
821         self.sec_grp_name = guid + 'name'
822
823         self.security_groups = list()
824         self.security_group_rules = list()
825         self.neutron = neutron_utils.neutron_client(self.os_creds)
826         self.keystone = keystone_utils.keystone_client(self.os_creds)
827
828     def tearDown(self):
829         """
830         Cleans the remote OpenStack objects
831         """
832         for rule in self.security_group_rules:
833             neutron_utils.delete_security_group_rule(self.neutron, rule)
834
835         for security_group in self.security_groups:
836             try:
837                 neutron_utils.delete_security_group(self.neutron,
838                                                     security_group)
839             except:
840                 pass
841
842     def test_create_delete_simple_sec_grp(self):
843         """
844         Tests the neutron_utils.create_security_group() function
845         """
846         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
847         security_group = neutron_utils.create_security_group(self.neutron,
848                                                              self.keystone,
849                                                              sec_grp_settings)
850
851         self.assertTrue(sec_grp_settings.name, security_group.name)
852
853         sec_grp_get = neutron_utils.get_security_group(
854             self.neutron, sec_grp_settings=sec_grp_settings)
855         self.assertIsNotNone(sec_grp_get)
856         self.assertTrue(validation_utils.objects_equivalent(
857             security_group, sec_grp_get))
858
859         neutron_utils.delete_security_group(self.neutron, security_group)
860         sec_grp_get = neutron_utils.get_security_group(
861             self.neutron, sec_grp_settings=sec_grp_settings)
862         self.assertIsNone(sec_grp_get)
863
864     def test_create_sec_grp_no_name(self):
865         """
866         Tests the SecurityGroupSettings constructor and
867         neutron_utils.create_security_group() function to ensure that
868         attempting to create a security group without a name will raise an
869         exception
870         """
871         with self.assertRaises(Exception):
872             sec_grp_settings = SecurityGroupSettings()
873             self.security_groups.append(
874                 neutron_utils.create_security_group(self.neutron,
875                                                     self.keystone,
876                                                     sec_grp_settings))
877
878     def test_create_sec_grp_no_rules(self):
879         """
880         Tests the neutron_utils.create_security_group() function
881         """
882         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
883                                                  description='hello group')
884         self.security_groups.append(
885             neutron_utils.create_security_group(self.neutron, self.keystone,
886                                                 sec_grp_settings))
887
888         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
889         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
890
891         sec_grp_get = neutron_utils.get_security_group(
892             self.neutron, sec_grp_settings=sec_grp_settings)
893         self.assertIsNotNone(sec_grp_get)
894         self.assertEqual(self.security_groups[0], sec_grp_get)
895
896     def test_create_sec_grp_one_rule(self):
897         """
898         Tests the neutron_utils.create_security_group() function
899         """
900
901         sec_grp_rule_settings = SecurityGroupRuleSettings(
902             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
903         sec_grp_settings = SecurityGroupSettings(
904             name=self.sec_grp_name, description='hello group',
905             rule_settings=[sec_grp_rule_settings])
906
907         self.security_groups.append(
908             neutron_utils.create_security_group(self.neutron, self.keystone,
909                                                 sec_grp_settings))
910         free_rules = neutron_utils.get_rules_by_security_group(
911             self.neutron, self.security_groups[0])
912         for free_rule in free_rules:
913             self.security_group_rules.append(free_rule)
914
915         self.security_group_rules.append(
916             neutron_utils.create_security_group_rule(
917                 self.neutron, sec_grp_settings.rule_settings[0]))
918
919         # Refresh object so it is populated with the newly added rule
920         security_group = neutron_utils.get_security_group(
921             self.neutron, sec_grp_settings=sec_grp_settings)
922
923         rules = neutron_utils.get_rules_by_security_group(self.neutron,
924                                                           security_group)
925
926         self.assertTrue(
927             validation_utils.objects_equivalent(
928                  self.security_group_rules, rules))
929
930         self.assertTrue(sec_grp_settings.name, security_group.name)
931
932         sec_grp_get = neutron_utils.get_security_group(
933             self.neutron, sec_grp_settings=sec_grp_settings)
934         self.assertIsNotNone(sec_grp_get)
935         self.assertEqual(security_group, sec_grp_get)
936
937     def test_get_sec_grp_by_id(self):
938         """
939         Tests the neutron_utils.create_security_group() function
940         """
941
942         self.security_groups.append(neutron_utils.create_security_group(
943             self.neutron, self.keystone,
944             SecurityGroupSettings(name=self.sec_grp_name + '-1',
945                                   description='hello group')))
946         self.security_groups.append(neutron_utils.create_security_group(
947             self.neutron, self.keystone,
948             SecurityGroupSettings(name=self.sec_grp_name + '-2',
949                                   description='hello group')))
950
951         sec_grp_1b = neutron_utils.get_security_group_by_id(
952             self.neutron, self.security_groups[0].id)
953         sec_grp_2b = neutron_utils.get_security_group_by_id(
954             self.neutron, self.security_groups[1].id)
955
956         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
957         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
958
959
960 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
961     """
962     Test basic nova keypair functionality
963     """
964
965     def setUp(self):
966         """
967         Instantiates the CreateImage object that is responsible for downloading
968         and creating an OS image file within OpenStack
969         """
970         self.neutron = neutron_utils.neutron_client(self.os_creds)
971         self.floating_ip = None
972
973     def tearDown(self):
974         """
975         Cleans the image and downloaded image file
976         """
977         if self.floating_ip:
978             try:
979                 neutron_utils.delete_floating_ip(
980                     self.neutron, self.floating_ip)
981             except:
982                 pass
983
984     def test_floating_ips(self):
985         """
986         Tests the creation of a floating IP
987         :return:
988         """
989         initial_fips = neutron_utils.get_floating_ips(self.neutron)
990
991         self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
992                                                             self.ext_net_name)
993         all_fips = neutron_utils.get_floating_ips(self.neutron)
994         self.assertEqual(len(initial_fips) + 1, len(all_fips))
995         returned = neutron_utils.get_floating_ip(self.neutron,
996                                                  self.floating_ip)
997         self.assertEqual(self.floating_ip.id, returned.id)
998         self.assertEqual(self.floating_ip.ip, returned.ip)
999
1000
1001 """
1002 Validation routines
1003 """
1004
1005
1006 def validate_network(neutron, name, exists):
1007     """
1008     Returns true if a network for a given name DOES NOT exist if the exists
1009     parameter is false conversely true. Returns false if a network for a given
1010     name DOES exist if the exists parameter is true conversely false.
1011     :param neutron: The neutron client
1012     :param name: The expected network name
1013     :param exists: Whether or not the network name should exist or not
1014     :return: True/False
1015     """
1016     network = neutron_utils.get_network(neutron, network_name=name)
1017     if exists and network:
1018         return True
1019     if not exists and not network:
1020         return True
1021     return False
1022
1023
1024 def validate_subnet(neutron, name, cidr, exists):
1025     """
1026     Returns true if a subnet for a given name DOES NOT exist if the exists
1027     parameter is false conversely true. Returns false if a subnet for a given
1028     name DOES exist if the exists parameter is true conversely false.
1029     :param neutron: The neutron client
1030     :param name: The expected subnet name
1031     :param cidr: The expected CIDR value
1032     :param exists: Whether or not the network name should exist or not
1033     :return: True/False
1034     """
1035     subnet = neutron_utils.get_subnet(neutron, subnet_name=name)
1036     if exists and subnet and subnet.name == name:
1037         return subnet.cidr == cidr
1038     if not exists and not subnet:
1039         return True
1040     return False
1041
1042
1043 def validate_router(neutron, name, exists):
1044     """
1045     Returns true if a router for a given name DOES NOT exist if the exists
1046     parameter is false conversely true. Returns false if a router for a given
1047     name DOES exist if the exists parameter is true conversely false.
1048     :param neutron: The neutron client
1049     :param name: The expected router name
1050     :param exists: Whether or not the network name should exist or not
1051     :return: True/False
1052     """
1053     router = neutron_utils.get_router(neutron, router_name=name)
1054     if exists and router:
1055         return True
1056     return False
1057
1058
1059 def validate_interface_router(interface_router, router, subnet):
1060     """
1061     Returns true if the router ID & subnet ID have been properly included into
1062     the interface router object
1063     :param interface_router: the SNAPS-OO InterfaceRouter domain object
1064     :param router: to validate against the interface_router
1065     :param subnet: to validate against the interface_router
1066     :return: True if both IDs match else False
1067     """
1068     subnet_id = interface_router.subnet_id
1069     router_id = interface_router.port_id
1070
1071     return subnet.id == subnet_id and router.id == router_id
1072
1073
1074 def validate_port(neutron, port_obj, this_port_name):
1075     """
1076     Returns true if a port for a given name DOES NOT exist if the exists
1077     parameter is false conversely true. Returns false if a port for a given
1078     name DOES exist if the exists parameter is true conversely false.
1079     :param neutron: The neutron client
1080     :param port_obj: The port object to lookup
1081     :param this_port_name: The expected router name
1082     :return: True/False
1083     """
1084     os_ports = neutron.list_ports()
1085     for os_port, os_port_insts in os_ports.items():
1086         for os_inst in os_port_insts:
1087             if os_inst['id'] == port_obj.id:
1088                 return os_inst['name'] == this_port_name
1089     return False