db3170e35717848834d9a9cbefca93d4958fb84e
[snaps.git] / snaps / openstack / tests / create_router_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import unittest
16 import uuid
17
18 from snaps.openstack import create_network
19 from snaps.openstack import create_router
20 from snaps.openstack.create_network import (
21     NetworkSettings, PortSettings)
22 from snaps.openstack.create_network import OpenStackNetwork
23 from snaps.openstack.create_router import (
24     RouterSettings, RouterSettingsError)
25 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
26 from snaps.openstack.utils import neutron_utils
27
28 __author__ = 'mmakati'
29
30 cidr1 = '10.200.201.0/24'
31 cidr2 = '10.200.202.0/24'
32 static_gateway_ip1 = '10.200.201.1'
33 static_gateway_ip2 = '10.200.202.1'
34
35
36 class RouterSettingsUnitTests(unittest.TestCase):
37     """
38     Class for testing the RouterSettings class
39     """
40
41     def test_no_params(self):
42         with self.assertRaises(RouterSettingsError):
43             RouterSettings()
44
45     def test_empty_config(self):
46         with self.assertRaises(RouterSettingsError):
47             RouterSettings(**dict())
48
49     def test_name_only(self):
50         settings = RouterSettings(name='foo')
51         self.assertEqual('foo', settings.name)
52         self.assertIsNone(settings.project_name)
53         self.assertIsNone(settings.external_gateway)
54         self.assertIsNone(settings.admin_state_up)
55         self.assertIsNone(settings.enable_snat)
56         self.assertIsNone(settings.external_fixed_ips)
57         self.assertIsNotNone(settings.internal_subnets)
58         self.assertTrue(isinstance(settings.internal_subnets, list))
59         self.assertEqual(0, len(settings.internal_subnets))
60         self.assertIsNotNone(settings.port_settings)
61         self.assertTrue(isinstance(settings.port_settings, list))
62         self.assertEqual(0, len(settings.port_settings))
63
64     def test_config_with_name_only(self):
65         settings = RouterSettings(**{'name': 'foo'})
66         self.assertEqual('foo', settings.name)
67         self.assertIsNone(settings.project_name)
68         self.assertIsNone(settings.external_gateway)
69         self.assertIsNone(settings.admin_state_up)
70         self.assertIsNone(settings.enable_snat)
71         self.assertIsNone(settings.external_fixed_ips)
72         self.assertIsNotNone(settings.internal_subnets)
73         self.assertTrue(isinstance(settings.internal_subnets, list))
74         self.assertEqual(0, len(settings.internal_subnets))
75         self.assertIsNotNone(settings.port_settings)
76         self.assertTrue(isinstance(settings.port_settings, list))
77         self.assertEqual(0, len(settings.port_settings))
78
79     def test_all(self):
80         port_settings = PortSettings(name='foo', network_name='bar')
81         settings = RouterSettings(
82             name='foo', project_name='bar', external_gateway='foo_gateway',
83             admin_state_up=True, enable_snat=False, external_fixed_ips=['ip1'],
84             internal_subnets=['10.0.0.1/24'], interfaces=[port_settings])
85         self.assertEqual('foo', settings.name)
86         self.assertEqual('bar', settings.project_name)
87         self.assertEqual('foo_gateway', settings.external_gateway)
88         self.assertTrue(settings.admin_state_up)
89         self.assertFalse(settings.enable_snat)
90         self.assertEqual(['ip1'], settings.external_fixed_ips)
91         self.assertIsNotNone(settings.internal_subnets)
92         self.assertTrue(isinstance(settings.internal_subnets, list))
93         self.assertEqual(1, len(settings.internal_subnets))
94         self.assertEqual(['10.0.0.1/24'], settings.internal_subnets)
95         self.assertEqual([port_settings], settings.port_settings)
96
97     def test_config_all(self):
98         settings = RouterSettings(
99             **{'name': 'foo', 'project_name': 'bar',
100                'external_gateway': 'foo_gateway', 'admin_state_up': True,
101                'enable_snat': False, 'external_fixed_ips': ['ip1'],
102                'internal_subnets': ['10.0.0.1/24'],
103                'interfaces':
104                    [{'port': {'name': 'foo-port',
105                               'network_name': 'bar-net'}}]})
106         self.assertEqual('foo', settings.name)
107         self.assertEqual('bar', settings.project_name)
108         self.assertEqual('foo_gateway', settings.external_gateway)
109         self.assertTrue(settings.admin_state_up)
110         self.assertFalse(settings.enable_snat)
111         self.assertEqual(['ip1'], settings.external_fixed_ips)
112         self.assertIsNotNone(settings.internal_subnets)
113         self.assertTrue(isinstance(settings.internal_subnets, list))
114         self.assertEqual(1, len(settings.internal_subnets))
115         self.assertEqual(['10.0.0.1/24'], settings.internal_subnets)
116         self.assertEqual([PortSettings(**{'name': 'foo-port',
117                                           'network_name': 'bar-net'})],
118                          settings.port_settings)
119
120
121 class CreateRouterSuccessTests(OSIntegrationTestCase):
122     """
123     Class for testing routers with various positive scenarios expected to
124     succeed
125     """
126
127     def setUp(self):
128         """
129         Initializes objects used for router testing
130         """
131         super(self.__class__, self).__start__()
132
133         self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
134         self.router_creator = None
135         self.network_creator1 = None
136         self.network_creator2 = None
137         self.neutron = neutron_utils.neutron_client(self.os_creds)
138
139     def tearDown(self):
140         """
141         Cleans the remote OpenStack objects used for router testing
142         """
143         if self.router_creator:
144             self.router_creator.clean()
145
146         if self.network_creator1:
147             self.network_creator1.clean()
148
149         if self.network_creator2:
150             self.network_creator2.clean()
151
152         super(self.__class__, self).__clean__()
153
154     def test_create_router_vanilla(self):
155         """
156         Test creation of a most basic router with minimal options.
157         """
158         router_settings = RouterSettings(name=self.guid + '-pub-router',
159                                          external_gateway=self.ext_net_name)
160
161         self.router_creator = create_router.OpenStackRouter(self.os_creds,
162                                                             router_settings)
163         self.router_creator.create()
164
165         router = neutron_utils.get_router(self.neutron,
166                                           router_settings=router_settings)
167         self.assertIsNotNone(router)
168
169         self.assertTrue(verify_router_attributes(
170             router, self.router_creator, ext_gateway=self.ext_net_name))
171
172     def test_create_router_admin_user_to_new_project(self):
173         """
174         Test creation of a most basic router with the admin user pointing
175         to the new project.
176         """
177         router_settings = RouterSettings(
178             name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
179             project_name=self.os_creds.project_name)
180
181         self.router_creator = create_router.OpenStackRouter(
182             self.admin_os_creds, router_settings)
183         self.router_creator.create()
184
185         router = neutron_utils.get_router(self.neutron,
186                                           router_settings=router_settings)
187         self.assertIsNotNone(router)
188
189         self.assertTrue(verify_router_attributes(
190             router, self.router_creator, ext_gateway=self.ext_net_name))
191
192     def test_create_router_new_user_to_admin_project(self):
193         """
194         Test creation of a most basic router with the new user pointing
195         to the admin project.
196         """
197         router_settings = RouterSettings(
198             name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
199             project_name=self.admin_os_creds.project_name)
200
201         self.router_creator = create_router.OpenStackRouter(
202             self.os_creds, router_settings)
203         self.router_creator.create()
204
205         router = neutron_utils.get_router(self.neutron,
206                                           router_settings=router_settings)
207         self.assertIsNotNone(router)
208
209         self.assertTrue(verify_router_attributes(
210             router, self.router_creator, ext_gateway=self.ext_net_name))
211
212     def test_create_delete_router(self):
213         """
214         Test that clean() will not raise an exception if the router is deleted
215         by another process.
216         """
217         self.router_settings = RouterSettings(
218             name=self.guid + '-pub-router', external_gateway=self.ext_net_name)
219
220         self.router_creator = create_router.OpenStackRouter(
221             self.os_creds, self.router_settings)
222         created_router = self.router_creator.create()
223         self.assertIsNotNone(created_router)
224         retrieved_router = neutron_utils.get_router(
225             self.neutron, router_settings=self.router_settings)
226         self.assertIsNotNone(retrieved_router)
227
228         neutron_utils.delete_router(self.neutron, created_router)
229
230         retrieved_router = neutron_utils.get_router(
231             self.neutron, router_settings=self.router_settings)
232         self.assertIsNone(retrieved_router)
233
234         # Should not raise an exception
235         self.router_creator.clean()
236
237     def test_create_router_admin_state_false(self):
238         """
239         Test creation of a basic router with admin state down.
240         """
241         router_settings = RouterSettings(name=self.guid + '-pub-router',
242                                          admin_state_up=False)
243
244         self.router_creator = create_router.OpenStackRouter(self.os_creds,
245                                                             router_settings)
246         self.router_creator.create()
247
248         router = neutron_utils.get_router(self.neutron,
249                                           router_settings=router_settings)
250         self.assertIsNotNone(router)
251
252         self.assertTrue(verify_router_attributes(router, self.router_creator,
253                                                  admin_state=False))
254
255     def test_create_router_admin_state_True(self):
256         """
257         Test creation of a basic router with admin state Up.
258         """
259         router_settings = RouterSettings(name=self.guid + '-pub-router',
260                                          admin_state_up=True)
261
262         self.router_creator = create_router.OpenStackRouter(self.os_creds,
263                                                             router_settings)
264         self.router_creator.create()
265
266         router = neutron_utils.get_router(self.neutron,
267                                           router_settings=router_settings)
268         self.assertIsNotNone(router)
269
270         self.assertTrue(verify_router_attributes(router, self.router_creator,
271                                                  admin_state=True))
272
273     def test_create_router_private_network(self):
274         """
275         Test creation of a router connected with two private networks and no
276         external gateway
277         """
278         network_settings1 = NetworkSettings(
279             name=self.guid + '-pub-net1',
280             subnet_settings=[
281                 create_network.SubnetSettings(
282                     cidr=cidr1, name=self.guid + '-pub-subnet1',
283                     gateway_ip=static_gateway_ip1)])
284         network_settings2 = NetworkSettings(
285             name=self.guid + '-pub-net2',
286             subnet_settings=[
287                 create_network.SubnetSettings(
288                     cidr=cidr2, name=self.guid + '-pub-subnet2',
289                     gateway_ip=static_gateway_ip2)])
290
291         self.network_creator1 = OpenStackNetwork(self.os_creds,
292                                                  network_settings1)
293         self.network_creator2 = OpenStackNetwork(self.os_creds,
294                                                  network_settings2)
295
296         self.network_creator1.create()
297         self.network_creator2.create()
298
299         port_settings = [
300             create_network.PortSettings(
301                 name=self.guid + '-port1',
302                 ip_addrs=[{
303                     'subnet_name':
304                         network_settings1.subnet_settings[0].name,
305                     'ip': static_gateway_ip1
306                 }],
307                 network_name=network_settings1.name,
308                 project_name=self.os_creds.project_name),
309             create_network.PortSettings(
310                 name=self.guid + '-port2',
311                 ip_addrs=[{
312                     'subnet_name': network_settings2.subnet_settings[0].name,
313                     'ip': static_gateway_ip2
314                 }],
315                 network_name=network_settings2.name,
316                 project_name=self.os_creds.project_name)]
317
318         router_settings = RouterSettings(name=self.guid + '-pub-router',
319                                          port_settings=port_settings)
320         self.router_creator = create_router.OpenStackRouter(self.os_creds,
321                                                             router_settings)
322         self.router_creator.create()
323
324         router = neutron_utils.get_router(self.neutron,
325                                           router_settings=router_settings)
326
327         self.assertTrue(verify_router_attributes(router, self.router_creator))
328
329         # Instantiate second identical creator to ensure a second router
330         # has not been created
331         router_creator2 = create_router.OpenStackRouter(
332             self.os_creds, router_settings)
333         router2 = router_creator2.create()
334         self.assertIsNotNone(self.router_creator.get_router(), router2)
335
336     def test_create_router_external_network(self):
337         """
338         Test creation of a router connected to an external network and a
339         private network.
340         """
341         network_settings = NetworkSettings(
342             name=self.guid + '-pub-net1',
343             subnet_settings=[
344                 create_network.SubnetSettings(
345                     cidr=cidr1, name=self.guid + '-pub-subnet1',
346                     gateway_ip=static_gateway_ip1)])
347         self.network_creator1 = OpenStackNetwork(self.os_creds,
348                                                  network_settings)
349         self.network_creator1.create()
350
351         port_settings = [
352             create_network.PortSettings(
353                 name=self.guid + '-port1',
354                 ip_addrs=[{
355                     'subnet_name': network_settings.subnet_settings[0].name,
356                     'ip': static_gateway_ip1}],
357                 network_name=network_settings.name,
358                 project_name=self.os_creds.project_name)]
359
360         router_settings = RouterSettings(
361             name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
362             port_settings=port_settings)
363         self.router_creator = create_router.OpenStackRouter(self.os_creds,
364                                                             router_settings)
365         self.router_creator.create()
366
367         router = neutron_utils.get_router(self.neutron,
368                                           router_settings=router_settings)
369
370         self.assertTrue(verify_router_attributes(
371             router, self.router_creator, ext_gateway=self.ext_net_name))
372
373
374 class CreateRouterNegativeTests(OSIntegrationTestCase):
375     """
376     Class for testing routers with various negative scenarios expected to fail.
377     """
378
379     def setUp(self):
380         """
381         Initializes objects used for router testing
382         """
383         super(self.__class__, self).__start__()
384
385         self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
386         self.router_creator = None
387
388     def tearDown(self):
389         """
390         Cleans the remote OpenStack objects used for router testing
391         """
392         if self.router_creator:
393             self.router_creator.clean()
394
395         super(self.__class__, self).__clean__()
396
397     def test_create_router_noname(self):
398         """
399         Test creating a router without a name.
400         """
401         with self.assertRaises(RouterSettingsError):
402             router_settings = RouterSettings(
403                 name=None, external_gateway=self.ext_net_name)
404             self.router_creator = create_router.OpenStackRouter(
405                 self.os_creds, router_settings)
406             self.router_creator.create()
407
408     def test_create_router_invalid_gateway_name(self):
409         """
410         Test creating a router without a valid network gateway name.
411         """
412         with self.assertRaises(RouterSettingsError):
413             router_settings = RouterSettings(name=self.guid + '-pub-router',
414                                              external_gateway="Invalid_name")
415             self.router_creator = create_router.OpenStackRouter(
416                 self.os_creds, router_settings)
417             self.router_creator.create()
418
419
420 def verify_router_attributes(router_operational, router_creator,
421                              admin_state=True, ext_gateway=None):
422     """
423     Helper function to validate the attributes of router created with the one
424     operational
425     :param router_operational: Operational Router object returned from neutron
426                                utils of type snaps.domain.Router
427     :param router_creator: router_creator object returned from creating a
428                            router in the router test functions
429     :param admin_state: True if router is expected to be Up, else False
430     :param ext_gateway: None if router is not connected to external gateway
431     :return:
432     """
433
434     router = router_creator.get_router()
435
436     if not router_operational:
437         return False
438     elif not router_creator:
439         return False
440     elif not (router_operational.name == router_creator.router_settings.name):
441         return False
442     elif not (router_operational.id == router.id):
443         return False
444     elif not (router_operational.status == router.status):
445         return False
446     elif not (router_operational.tenant_id == router.tenant_id):
447         return False
448     elif not (admin_state == router_operational.admin_state_up):
449         return False
450     elif (ext_gateway is None) and \
451             (router_operational.external_gateway_info is not None):
452         return False
453     elif ext_gateway is not None:
454         if router_operational.external_gateway_info is None:
455             return False
456     return True