1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.config.network import PortConfig, NetworkConfig
19 from snaps.config.router import RouterConfigError, RouterConfig
20 from snaps.openstack import create_network
21 from snaps.openstack import create_router
22 from snaps.openstack.create_network import OpenStackNetwork
23 from snaps.openstack.create_router import RouterSettings
24 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
25 from snaps.openstack.utils import neutron_utils, settings_utils
27 __author__ = 'mmakati'
29 cidr1 = '10.200.201.0/24'
30 cidr2 = '10.200.202.0/24'
31 static_gateway_ip1 = '10.200.201.1'
32 static_gateway_ip2 = '10.200.202.1'
35 class RouterSettingsUnitTests(unittest.TestCase):
37 Class for testing the RouterSettings class
40 def test_no_params(self):
41 with self.assertRaises(RouterConfigError):
44 def test_empty_config(self):
45 with self.assertRaises(RouterConfigError):
46 RouterSettings(**dict())
48 def test_name_only(self):
49 settings = RouterSettings(name='foo')
50 self.assertEqual('foo', settings.name)
51 self.assertIsNone(settings.project_name)
52 self.assertIsNone(settings.external_gateway)
53 self.assertTrue(settings.admin_state_up)
54 self.assertIsNone(settings.enable_snat)
55 self.assertIsNotNone(settings.internal_subnets)
56 self.assertTrue(isinstance(settings.internal_subnets, list))
57 self.assertEqual(0, len(settings.internal_subnets))
58 self.assertIsNotNone(settings.port_settings)
59 self.assertTrue(isinstance(settings.port_settings, list))
60 self.assertEqual(0, len(settings.port_settings))
62 def test_config_with_name_only(self):
63 settings = RouterSettings(**{'name': 'foo'})
64 self.assertEqual('foo', settings.name)
65 self.assertIsNone(settings.project_name)
66 self.assertIsNone(settings.external_gateway)
67 self.assertTrue(settings.admin_state_up)
68 self.assertIsNone(settings.enable_snat)
69 self.assertIsNotNone(settings.internal_subnets)
70 self.assertTrue(isinstance(settings.internal_subnets, list))
71 self.assertEqual(0, len(settings.internal_subnets))
72 self.assertIsNotNone(settings.port_settings)
73 self.assertTrue(isinstance(settings.port_settings, list))
74 self.assertEqual(0, len(settings.port_settings))
77 port_settings = PortConfig(name='foo', network_name='bar')
78 settings = RouterSettings(
79 name='foo', project_name='bar', external_gateway='foo_gateway',
80 admin_state_up=True, enable_snat=False,
81 internal_subnets=['10.0.0.1/24'], interfaces=[port_settings])
82 self.assertEqual('foo', settings.name)
83 self.assertEqual('bar', settings.project_name)
84 self.assertEqual('foo_gateway', settings.external_gateway)
85 self.assertTrue(settings.admin_state_up)
86 self.assertFalse(settings.enable_snat)
87 self.assertIsNotNone(settings.internal_subnets)
88 self.assertTrue(isinstance(settings.internal_subnets, list))
89 self.assertEqual(1, len(settings.internal_subnets))
90 self.assertEqual(['10.0.0.1/24'], settings.internal_subnets)
91 self.assertEqual([port_settings], settings.port_settings)
93 def test_config_all(self):
94 settings = RouterSettings(
95 **{'name': 'foo', 'project_name': 'bar',
96 'external_gateway': 'foo_gateway', 'admin_state_up': True,
97 'enable_snat': False, 'internal_subnets': ['10.0.0.1/24'],
99 [{'port': {'name': 'foo-port',
100 'network_name': 'bar-net'}}]})
101 self.assertEqual('foo', settings.name)
102 self.assertEqual('bar', settings.project_name)
103 self.assertEqual('foo_gateway', settings.external_gateway)
104 self.assertTrue(settings.admin_state_up)
105 self.assertFalse(settings.enable_snat)
106 self.assertIsNotNone(settings.internal_subnets)
107 self.assertTrue(isinstance(settings.internal_subnets, list))
108 self.assertEqual(1, len(settings.internal_subnets))
109 self.assertEqual(['10.0.0.1/24'], settings.internal_subnets)
111 [PortConfig(**{'name': 'foo-port', 'network_name': 'bar-net'})],
112 settings.port_settings)
115 class CreateRouterSuccessTests(OSIntegrationTestCase):
117 Class for testing routers with various positive scenarios expected to
123 Initializes objects used for router testing
125 super(self.__class__, self).__start__()
127 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
128 self.router_creator = None
129 self.network_creator1 = None
130 self.network_creator2 = None
131 self.neutron = neutron_utils.neutron_client(self.os_creds)
135 Cleans the remote OpenStack objects used for router testing
137 if self.router_creator:
138 self.router_creator.clean()
140 if self.network_creator1:
141 self.network_creator1.clean()
143 if self.network_creator2:
144 self.network_creator2.clean()
146 super(self.__class__, self).__clean__()
148 def test_create_router_vanilla(self):
150 Test creation of a most basic router with minimal options.
152 router_settings = RouterConfig(
153 name=self.guid + '-pub-router', external_gateway=self.ext_net_name)
155 self.router_creator = create_router.OpenStackRouter(self.os_creds,
157 self.router_creator.create()
159 router = neutron_utils.get_router(self.neutron,
160 router_settings=router_settings)
161 self.assertIsNotNone(router)
163 self.assertEqual(self.router_creator.get_router(), router)
165 self.check_router_recreation(router, router_settings)
167 def test_create_router_admin_user_to_new_project(self):
169 Test creation of a most basic router with the admin user pointing
172 router_settings = RouterConfig(
173 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
174 project_name=self.os_creds.project_name)
176 self.router_creator = create_router.OpenStackRouter(
177 self.admin_os_creds, router_settings)
178 self.router_creator.create()
180 router = neutron_utils.get_router(
181 self.neutron, router_settings=router_settings)
182 self.assertIsNotNone(router)
184 self.assertEqual(self.router_creator.get_router().id, router.id)
186 self.check_router_recreation(router, router_settings)
188 def test_create_router_new_user_as_admin_project(self):
190 Test creation of a most basic router with the new user pointing
191 to the admin project.
193 router_settings = RouterConfig(
194 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
195 project_name=self.os_creds.project_name)
197 self.router_creator = create_router.OpenStackRouter(
198 self.admin_os_creds, router_settings)
199 self.router_creator.create()
201 router = neutron_utils.get_router(
202 self.neutron, router_settings=router_settings)
203 self.assertIsNotNone(router)
205 self.assertEqual(self.router_creator.get_router().id, router.id)
207 self.check_router_recreation(router, router_settings)
209 def test_create_delete_router(self):
211 Test that clean() will not raise an exception if the router is deleted
214 self.router_settings = RouterConfig(
215 name=self.guid + '-pub-router', external_gateway=self.ext_net_name)
217 self.router_creator = create_router.OpenStackRouter(
218 self.os_creds, self.router_settings)
219 created_router = self.router_creator.create()
220 self.assertIsNotNone(created_router)
221 retrieved_router = neutron_utils.get_router(
222 self.neutron, router_settings=self.router_settings)
223 self.assertIsNotNone(retrieved_router)
225 neutron_utils.delete_router(self.neutron, created_router)
227 retrieved_router = neutron_utils.get_router(
228 self.neutron, router_settings=self.router_settings)
229 self.assertIsNone(retrieved_router)
231 # Should not raise an exception
232 self.router_creator.clean()
234 def test_create_router_admin_state_false(self):
236 Test creation of a basic router with admin state down.
238 router_settings = RouterConfig(
239 name=self.guid + '-pub-router', admin_state_up=False)
241 self.router_creator = create_router.OpenStackRouter(self.os_creds,
243 self.router_creator.create()
245 router = neutron_utils.get_router(self.neutron,
246 router_settings=router_settings)
247 self.assertIsNotNone(router)
249 self.assertEqual(self.router_creator.get_router(), router)
251 self.check_router_recreation(router, router_settings)
253 def test_create_router_admin_state_True(self):
255 Test creation of a basic router with admin state Up.
257 router_settings = RouterConfig(
258 name=self.guid + '-pub-router', admin_state_up=True)
260 self.router_creator = create_router.OpenStackRouter(
261 self.os_creds, router_settings)
262 self.router_creator.create()
264 router = neutron_utils.get_router(
265 self.neutron, router_settings=router_settings)
266 self.assertIsNotNone(router)
268 self.assertEqual(self.router_creator.get_router(), router)
270 self.check_router_recreation(router, router_settings)
272 def test_create_router_private_network(self):
274 Test creation of a router connected with two private networks and no
277 network_settings1 = NetworkConfig(
278 name=self.guid + '-pub-net1',
280 create_network.SubnetConfig(
281 cidr=cidr1, name=self.guid + '-pub-subnet1',
282 gateway_ip=static_gateway_ip1)])
283 network_settings2 = NetworkConfig(
284 name=self.guid + '-pub-net2',
286 create_network.SubnetConfig(
287 cidr=cidr2, name=self.guid + '-pub-subnet2',
288 gateway_ip=static_gateway_ip2)])
290 self.network_creator1 = OpenStackNetwork(self.os_creds,
292 self.network_creator2 = OpenStackNetwork(self.os_creds,
295 self.network_creator1.create()
296 self.network_creator2.create()
299 create_network.PortConfig(
300 name=self.guid + '-port1',
303 network_settings1.subnet_settings[0].name,
304 'ip': static_gateway_ip1
306 network_name=network_settings1.name),
307 create_network.PortConfig(
308 name=self.guid + '-port2',
310 'subnet_name': network_settings2.subnet_settings[0].name,
311 'ip': static_gateway_ip2
313 network_name=network_settings2.name)]
315 router_settings = RouterConfig(
316 name=self.guid + '-pub-router', port_settings=port_settings)
317 self.router_creator = create_router.OpenStackRouter(
318 self.os_creds, router_settings)
319 self.router_creator.create()
321 router = neutron_utils.get_router(
322 self.neutron, router_settings=router_settings)
324 self.assertEqual(router, self.router_creator.get_router())
326 # Instantiate second identical creator to ensure a second router
327 # has not been created
328 router_creator2 = create_router.OpenStackRouter(
329 self.os_creds, router_settings)
330 router2 = router_creator2.create()
331 self.assertIsNotNone(self.router_creator.get_router(), router2)
333 self.check_router_recreation(router2, router_settings)
335 def test_create_router_external_network(self):
337 Test creation of a router connected to an external network and a
340 network_settings = NetworkConfig(
341 name=self.guid + '-pub-net1',
343 create_network.SubnetConfig(
344 cidr=cidr1, name=self.guid + '-pub-subnet1',
345 gateway_ip=static_gateway_ip1)])
346 self.network_creator1 = OpenStackNetwork(self.os_creds,
348 self.network_creator1.create()
351 create_network.PortConfig(
352 name=self.guid + '-port1',
354 'subnet_name': network_settings.subnet_settings[0].name,
355 'ip': static_gateway_ip1}],
356 network_name=network_settings.name)]
358 router_settings = RouterConfig(
359 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
360 port_settings=port_settings)
361 self.router_creator = create_router.OpenStackRouter(
362 self.os_creds, router_settings)
363 self.router_creator.create()
365 router = neutron_utils.get_router(
366 self.neutron, router_settings=router_settings)
368 self.assertEquals(router, self.router_creator.get_router())
370 self.check_router_recreation(router, router_settings)
372 def check_router_recreation(self, router, orig_settings):
374 Validates the derived RouterConfig with the original
375 :param router: the Router domain object to test
376 :param orig_settings: the original RouterConfig object that was
377 responsible for creating the router
378 :return: the derived RouterConfig object
380 derived_settings = settings_utils.create_router_config(
381 self.neutron, router)
382 self.assertIsNotNone(derived_settings)
384 orig_settings.enable_snat, derived_settings.enable_snat)
385 self.assertEqual(orig_settings.external_gateway,
386 derived_settings.external_gateway)
387 self.assertEqual(orig_settings.name, derived_settings.name)
388 self.assertEqual(orig_settings.internal_subnets,
389 derived_settings.internal_subnets)
391 if orig_settings.external_gateway:
392 self.assertEqual(len(orig_settings.port_settings),
393 len(derived_settings.port_settings))
395 self.assertEqual(len(orig_settings.port_settings),
396 len(derived_settings.port_settings))
398 if len(orig_settings.port_settings) > 0:
399 self.assertEqual(orig_settings.port_settings[0].name,
400 derived_settings.port_settings[0].name)
402 if len(orig_settings.port_settings) > 1:
403 self.assertEqual(orig_settings.port_settings[1].name,
404 derived_settings.port_settings[1].name)
406 return derived_settings
409 class CreateRouterNegativeTests(OSIntegrationTestCase):
411 Class for testing routers with various negative scenarios expected to fail.
416 Initializes objects used for router testing
418 super(self.__class__, self).__start__()
420 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
421 self.router_creator = None
425 Cleans the remote OpenStack objects used for router testing
427 if self.router_creator:
428 self.router_creator.clean()
430 super(self.__class__, self).__clean__()
432 def test_create_router_noname(self):
434 Test creating a router without a name.
436 with self.assertRaises(RouterConfigError):
437 router_settings = RouterConfig(
438 name=None, external_gateway=self.ext_net_name)
439 self.router_creator = create_router.OpenStackRouter(
440 self.os_creds, router_settings)
441 self.router_creator.create()
443 def test_create_router_invalid_gateway_name(self):
445 Test creating a router without a valid network gateway name.
447 with self.assertRaises(RouterConfigError):
448 router_settings = RouterConfig(
449 name=self.guid + '-pub-router',
450 external_gateway="Invalid_name")
451 self.router_creator = create_router.OpenStackRouter(
452 self.os_creds, router_settings)
453 self.router_creator.create()