1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.config.network import PortConfig, NetworkConfig, PortConfigError
19 from snaps.config.router import RouterConfigError, RouterConfig
20 from snaps.config.security_group import SecurityGroupConfig
21 from snaps.openstack import create_network
22 from snaps.openstack import create_router
23 from snaps.openstack.create_network import OpenStackNetwork
24 from snaps.openstack.create_router import RouterSettings, OpenStackRouter
25 from snaps.openstack.create_security_group import OpenStackSecurityGroup
26 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
27 from snaps.openstack.utils import neutron_utils, settings_utils
29 __author__ = 'mmakati'
31 cidr1 = '10.200.201.0/24'
32 cidr2 = '10.200.202.0/24'
33 static_gateway_ip1 = '10.200.201.1'
34 static_gateway_ip2 = '10.200.202.1'
37 class RouterSettingsUnitTests(unittest.TestCase):
39 Class for testing the RouterSettings class
42 def test_no_params(self):
43 with self.assertRaises(RouterConfigError):
46 def test_empty_config(self):
47 with self.assertRaises(RouterConfigError):
48 RouterSettings(**dict())
50 def test_name_only(self):
51 settings = RouterSettings(name='foo')
52 self.assertEqual('foo', settings.name)
53 self.assertIsNone(settings.project_name)
54 self.assertIsNone(settings.external_gateway)
55 self.assertTrue(settings.admin_state_up)
56 self.assertIsNone(settings.enable_snat)
57 self.assertIsNotNone(settings.internal_subnets)
58 self.assertTrue(isinstance(settings.internal_subnets, list))
59 self.assertEqual(0, len(settings.internal_subnets))
60 self.assertIsNotNone(settings.port_settings)
61 self.assertTrue(isinstance(settings.port_settings, list))
62 self.assertEqual(0, len(settings.port_settings))
64 def test_config_with_name_only(self):
65 settings = RouterSettings(**{'name': 'foo'})
66 self.assertEqual('foo', settings.name)
67 self.assertIsNone(settings.project_name)
68 self.assertIsNone(settings.external_gateway)
69 self.assertTrue(settings.admin_state_up)
70 self.assertIsNone(settings.enable_snat)
71 self.assertIsNotNone(settings.internal_subnets)
72 self.assertTrue(isinstance(settings.internal_subnets, list))
73 self.assertEqual(0, len(settings.internal_subnets))
74 self.assertIsNotNone(settings.port_settings)
75 self.assertTrue(isinstance(settings.port_settings, list))
76 self.assertEqual(0, len(settings.port_settings))
79 port_settings = PortConfig(name='foo', network_name='bar')
80 settings = RouterSettings(
81 name='foo', project_name='bar', external_gateway='foo_gateway',
82 admin_state_up=True, enable_snat=False,
83 internal_subnets=['10.0.0.1/24'], interfaces=[port_settings])
84 self.assertEqual('foo', settings.name)
85 self.assertEqual('bar', settings.project_name)
86 self.assertEqual('foo_gateway', settings.external_gateway)
87 self.assertTrue(settings.admin_state_up)
88 self.assertFalse(settings.enable_snat)
89 self.assertIsNotNone(settings.internal_subnets)
90 self.assertTrue(isinstance(settings.internal_subnets, list))
91 self.assertEqual(1, len(settings.internal_subnets))
92 self.assertEqual(['10.0.0.1/24'], settings.internal_subnets)
93 self.assertEqual([port_settings], settings.port_settings)
95 def test_config_all(self):
96 settings = RouterSettings(
97 **{'name': 'foo', 'project_name': 'bar',
98 'external_gateway': 'foo_gateway', 'admin_state_up': True,
99 'enable_snat': False, 'internal_subnets': ['10.0.0.1/24'],
101 [{'port': {'name': 'foo-port',
102 'network_name': 'bar-net'}}]})
103 self.assertEqual('foo', settings.name)
104 self.assertEqual('bar', settings.project_name)
105 self.assertEqual('foo_gateway', settings.external_gateway)
106 self.assertTrue(settings.admin_state_up)
107 self.assertFalse(settings.enable_snat)
108 self.assertIsNotNone(settings.internal_subnets)
109 self.assertTrue(isinstance(settings.internal_subnets, list))
110 self.assertEqual(1, len(settings.internal_subnets))
111 self.assertEqual(['10.0.0.1/24'], settings.internal_subnets)
113 [PortConfig(**{'name': 'foo-port', 'network_name': 'bar-net'})],
114 settings.port_settings)
117 class CreateRouterSuccessTests(OSIntegrationTestCase):
119 Class for testing routers with various positive scenarios expected to
125 Initializes objects used for router testing
127 super(self.__class__, self).__start__()
129 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
130 self.router_creator = None
131 self.network_creator1 = None
132 self.network_creator2 = None
133 self.neutron = neutron_utils.neutron_client(
134 self.os_creds, self.os_session)
138 Cleans the remote OpenStack objects used for router testing
140 if self.router_creator:
141 self.router_creator.clean()
143 if self.network_creator1:
144 self.network_creator1.clean()
146 if self.network_creator2:
147 self.network_creator2.clean()
149 super(self.__class__, self).__clean__()
151 def test_create_router_vanilla(self):
153 Test creation of a most basic router with minimal options.
155 router_settings = RouterConfig(
156 name=self.guid + '-pub-router', external_gateway=self.ext_net_name)
158 self.router_creator = create_router.OpenStackRouter(
159 self.os_creds, router_settings)
160 self.router_creator.create()
162 router = neutron_utils.get_router(
163 self.neutron, self.keystone, router_settings=router_settings,
164 project_name=self.os_creds.project_name)
165 self.assertIsNotNone(router)
167 self.assertEqual(self.router_creator.get_router(), router)
169 self.check_router_recreation(router, router_settings)
171 def test_create_router_admin_user_to_new_project(self):
173 Test creation of a most basic router with the admin user pointing
176 router_settings = RouterConfig(
177 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
178 project_name=self.os_creds.project_name)
180 self.router_creator = create_router.OpenStackRouter(
181 self.admin_os_creds, router_settings)
182 self.router_creator.create()
184 router = neutron_utils.get_router(
185 self.neutron, self.keystone, router_settings=router_settings,
186 project_name=self.os_creds.project_name)
187 self.assertIsNotNone(router)
189 self.assertEqual(self.router_creator.get_router().id, router.id)
191 self.check_router_recreation(router, router_settings)
193 def test_create_router_new_user_as_admin_project(self):
195 Test creation of a most basic router with the new user pointing
196 to the admin project.
198 router_settings = RouterConfig(
199 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
200 project_name=self.os_creds.project_name)
202 self.router_creator = create_router.OpenStackRouter(
203 self.admin_os_creds, router_settings)
204 self.router_creator.create()
206 router = neutron_utils.get_router(
207 self.neutron, self.keystone, router_settings=router_settings,
208 project_name=self.os_creds.project_name)
209 self.assertIsNotNone(router)
211 self.assertEqual(self.router_creator.get_router().id, router.id)
213 self.check_router_recreation(router, router_settings)
215 def test_create_delete_router(self):
217 Test that clean() will not raise an exception if the router is deleted
220 self.router_settings = RouterConfig(
221 name=self.guid + '-pub-router', external_gateway=self.ext_net_name)
223 self.router_creator = create_router.OpenStackRouter(
224 self.os_creds, self.router_settings)
225 created_router = self.router_creator.create()
226 self.assertIsNotNone(created_router)
227 retrieved_router = neutron_utils.get_router(
228 self.neutron, self.keystone, router_settings=self.router_settings,
229 project_name=self.os_creds.project_name)
230 self.assertIsNotNone(retrieved_router)
232 neutron_utils.delete_router(self.neutron, created_router)
234 retrieved_router = neutron_utils.get_router(
235 self.neutron, self.keystone, router_settings=self.router_settings,
236 project_name=self.os_creds.project_name)
237 self.assertIsNone(retrieved_router)
239 # Should not raise an exception
240 self.router_creator.clean()
242 def test_create_router_admin_state_false(self):
244 Test creation of a basic router with admin state down.
246 router_settings = RouterConfig(
247 name=self.guid + '-pub-router', admin_state_up=False)
249 self.router_creator = create_router.OpenStackRouter(self.os_creds,
251 self.router_creator.create()
253 router = neutron_utils.get_router(
254 self.neutron, self.keystone, router_settings=router_settings,
255 project_name=self.os_creds.project_name)
256 self.assertIsNotNone(router)
258 self.assertEqual(self.router_creator.get_router(), router)
260 self.check_router_recreation(router, router_settings)
262 def test_create_router_admin_state_True(self):
264 Test creation of a basic router with admin state Up.
266 router_settings = RouterConfig(
267 name=self.guid + '-pub-router', admin_state_up=True)
269 self.router_creator = create_router.OpenStackRouter(
270 self.os_creds, router_settings)
271 self.router_creator.create()
273 router = neutron_utils.get_router(
274 self.neutron, self.keystone, router_settings=router_settings,
275 project_name=self.os_creds.project_name)
276 self.assertIsNotNone(router)
278 self.assertEqual(self.router_creator.get_router(), router)
280 self.check_router_recreation(router, router_settings)
282 def test_create_router_private_network(self):
284 Test creation of a router connected with two private networks and no
287 network_settings1 = NetworkConfig(
288 name=self.guid + '-pub-net1',
290 create_network.SubnetConfig(
291 cidr=cidr1, name=self.guid + '-pub-subnet1',
292 gateway_ip=static_gateway_ip1)])
293 network_settings2 = NetworkConfig(
294 name=self.guid + '-pub-net2',
296 create_network.SubnetConfig(
297 cidr=cidr2, name=self.guid + '-pub-subnet2',
298 gateway_ip=static_gateway_ip2)])
300 self.network_creator1 = OpenStackNetwork(self.os_creds,
302 self.network_creator2 = OpenStackNetwork(self.os_creds,
305 self.network_creator1.create()
306 self.network_creator2.create()
309 create_network.PortConfig(
310 name=self.guid + '-port1',
313 network_settings1.subnet_settings[0].name,
314 'ip': static_gateway_ip1
316 network_name=network_settings1.name),
317 create_network.PortConfig(
318 name=self.guid + '-port2',
320 'subnet_name': network_settings2.subnet_settings[0].name,
321 'ip': static_gateway_ip2
323 network_name=network_settings2.name)]
325 router_settings = RouterConfig(
326 name=self.guid + '-pub-router', port_settings=port_settings)
327 self.router_creator = create_router.OpenStackRouter(
328 self.os_creds, router_settings)
329 self.router_creator.create()
331 router = neutron_utils.get_router(
332 self.neutron, self.keystone, router_settings=router_settings,
333 project_name=self.os_creds.project_name)
335 self.assertEqual(router, self.router_creator.get_router())
337 # Instantiate second identical creator to ensure a second router
338 # has not been created
339 router_creator2 = create_router.OpenStackRouter(
340 self.os_creds, router_settings)
341 router2 = router_creator2.create()
342 self.assertIsNotNone(self.router_creator.get_router(), router2)
344 self.check_router_recreation(router2, router_settings)
346 def test_create_router_external_network(self):
348 Test creation of a router connected to an external network and a
351 network_settings = NetworkConfig(
352 name=self.guid + '-pub-net1',
354 create_network.SubnetConfig(
355 cidr=cidr1, name=self.guid + '-pub-subnet1',
356 gateway_ip=static_gateway_ip1)])
357 self.network_creator1 = OpenStackNetwork(self.os_creds,
359 self.network_creator1.create()
362 create_network.PortConfig(
363 name=self.guid + '-port1',
365 'subnet_name': network_settings.subnet_settings[0].name,
366 'ip': static_gateway_ip1}],
367 network_name=network_settings.name)]
369 router_settings = RouterConfig(
370 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
371 port_settings=port_settings)
372 self.router_creator = create_router.OpenStackRouter(
373 self.os_creds, router_settings)
374 self.router_creator.create()
376 router = neutron_utils.get_router(
377 self.neutron, self.keystone, router_settings=router_settings,
378 project_name=self.os_creds.project_name)
380 self.assertEquals(router, self.router_creator.get_router())
382 self.check_router_recreation(router, router_settings)
384 def check_router_recreation(self, router, orig_settings):
386 Validates the derived RouterConfig with the original
387 :param router: the Router domain object to test
388 :param orig_settings: the original RouterConfig object that was
389 responsible for creating the router
390 :return: the derived RouterConfig object
392 derived_settings = settings_utils.create_router_config(
393 self.neutron, router)
394 self.assertIsNotNone(derived_settings)
396 orig_settings.enable_snat, derived_settings.enable_snat)
397 self.assertEqual(orig_settings.external_gateway,
398 derived_settings.external_gateway)
399 self.assertEqual(orig_settings.name, derived_settings.name)
400 self.assertEqual(orig_settings.internal_subnets,
401 derived_settings.internal_subnets)
403 if orig_settings.external_gateway:
404 self.assertEqual(len(orig_settings.port_settings),
405 len(derived_settings.port_settings))
407 self.assertEqual(len(orig_settings.port_settings),
408 len(derived_settings.port_settings))
410 if len(orig_settings.port_settings) > 0:
411 self.assertEqual(orig_settings.port_settings[0].name,
412 derived_settings.port_settings[0].name)
414 if len(orig_settings.port_settings) > 1:
415 self.assertEqual(orig_settings.port_settings[1].name,
416 derived_settings.port_settings[1].name)
418 return derived_settings
421 class CreateRouterNegativeTests(OSIntegrationTestCase):
423 Class for testing routers with various negative scenarios expected to fail.
428 Initializes objects used for router testing
430 super(self.__class__, self).__start__()
432 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
433 self.network_creator = None
434 self.router_creator = None
438 Cleans the remote OpenStack objects used for router testing
440 if self.router_creator:
441 self.router_creator.clean()
443 if self.network_creator:
444 self.network_creator.clean()
446 super(self.__class__, self).__clean__()
448 def test_create_router_noname(self):
450 Test creating a router without a name.
452 with self.assertRaises(RouterConfigError):
453 router_settings = RouterConfig(
454 name=None, external_gateway=self.ext_net_name)
455 self.router_creator = create_router.OpenStackRouter(
456 self.os_creds, router_settings)
457 self.router_creator.create()
459 def test_create_router_invalid_gateway_name(self):
461 Test creating a router without a valid network gateway name.
463 with self.assertRaises(RouterConfigError):
464 router_settings = RouterConfig(
465 name=self.guid + '-pub-router',
466 external_gateway="Invalid_name")
467 self.router_creator = create_router.OpenStackRouter(
468 self.os_creds, router_settings)
469 self.router_creator.create()
471 def test_create_router_admin_ports(self):
473 Test creation of a router with ports to subnets owned by the admin
476 network_settings = NetworkConfig(
477 name=self.guid + '-pub-net1',
479 create_network.SubnetConfig(
480 cidr=cidr1, name=self.guid + '-pub-subnet1',
481 gateway_ip=static_gateway_ip1)])
482 self.network_creator = OpenStackNetwork(
483 self.admin_os_creds, network_settings)
484 self.network_creator.create()
487 create_network.PortConfig(
488 name=self.guid + '-port1',
490 'subnet_name': network_settings.subnet_settings[0].name,
491 'ip': static_gateway_ip1}],
492 network_name=network_settings.name)]
494 router_settings = RouterConfig(
495 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
496 port_settings=port_settings)
497 self.router_creator = create_router.OpenStackRouter(
498 self.os_creds, router_settings)
500 with self.assertRaises(PortConfigError):
501 self.router_creator.create()
504 class CreateMultipleRouterTests(OSIntegrationTestCase):
506 Test for the OpenStackRouter class and how it interacts with routers
507 groups within other projects with the same name
512 Initializes objects used for router testing
514 super(self.__class__, self).__start__()
516 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
517 self.admin_router_creator = None
518 self.proj_router_creator = None
519 self.neutron = neutron_utils.neutron_client(
520 self.os_creds, self.os_session)
522 network_settings = NetworkConfig(
523 name=self.guid + '-pub-net', shared=True,
525 create_network.SubnetConfig(
526 cidr=cidr1, name=self.guid + '-pub-subnet',
527 gateway_ip=static_gateway_ip1)])
529 self.network_creator = OpenStackNetwork(
530 self.admin_os_creds, network_settings)
531 self.network_creator.create()
535 Cleans the remote OpenStack objects used for router testing
537 if self.admin_router_creator:
538 self.admin_router_creator.clean()
540 if self.proj_router_creator:
541 self.proj_router_creator.clean()
543 if self.network_creator:
544 self.network_creator.clean()
546 super(self.__class__, self).__clean__()
548 def test_router_same_name_diff_proj(self):
550 Tests the creation of an OpenStackNetwork with the same name
551 within a different project/tenant when not configured but implied by
556 router_config = RouterConfig(name=self.guid + '-router')
557 self.admin_router_creator = OpenStackRouter(
558 self.admin_os_creds, router_config)
559 self.admin_router_creator.create()
561 self.proj_router_creator = OpenStackRouter(
562 self.os_creds, router_config)
563 self.proj_router_creator.create()
566 self.admin_router_creator.get_router().id,
567 self.proj_router_creator.get_router().id)
569 admin_creator2 = OpenStackRouter(
570 self.admin_os_creds, router_config)
571 admin_creator2.create()
573 self.admin_router_creator.get_router(),
574 admin_creator2.get_router())
576 proj_creator2 = OpenStackRouter(self.os_creds, router_config)
577 proj_creator2.create()
578 self.assertEqual(self.proj_router_creator.get_router(),
579 proj_creator2.get_router())
581 def test_router_create_by_admin_to_different_project(self):
583 Tests the creation of an OpenStackRouter by the admin user and
584 initialize again with tenant credentials.
588 admin_router_config = RouterConfig(
589 name=self.guid + '-router',
590 project_name=self.os_creds.project_name)
592 self.admin_router_creator = OpenStackRouter(
593 self.admin_os_creds, admin_router_config)
594 self.admin_router_creator.create()
596 proj_router_config = RouterConfig(
597 name=self.guid + '-router',
598 project_name=self.os_creds.project_name)
600 self.proj_router_creator = OpenStackRouter(
601 self.os_creds, proj_router_config)
602 self.proj_router_creator.create()
605 self.admin_router_creator.get_router().id,
606 self.proj_router_creator.get_router().id)
609 class CreateRouterSecurityGroupTests(OSIntegrationTestCase):
611 Class for testing routers with ports containing security groups
616 Initializes objects used for router testing
618 super(self.__class__, self).__start__()
620 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
621 self.router_creator = None
622 self.network_creator = None
624 self.sec_grp_creator = OpenStackSecurityGroup(
625 self.os_creds, SecurityGroupConfig(name=self.guid + '-sec_grp'))
626 self.sec_grp_creator.create()
628 self.neutron = neutron_utils.neutron_client(
629 self.os_creds, self.os_session)
633 Cleans the remote OpenStack objects used for router testing
635 if self.router_creator:
636 self.router_creator.clean()
638 if self.network_creator:
639 self.network_creator.clean()
641 if self.sec_grp_creator:
642 self.sec_grp_creator.clean()
644 super(self.__class__, self).__clean__()
646 def test_create_router_secure_port(self):
648 Test creation of a router with a port that has a security group.
650 network_settings = NetworkConfig(
651 name=self.guid + '-pub-net1',
653 create_network.SubnetConfig(
654 cidr=cidr1, name=self.guid + '-pub-subnet1')])
655 self.network_creator = OpenStackNetwork(
656 self.os_creds, network_settings)
657 self.network_creator.create()
660 create_network.PortConfig(
661 name=self.guid + '-port1',
663 'subnet_name': network_settings.subnet_settings[0].name,
664 'ip': static_gateway_ip1}],
665 network_name=network_settings.name,
666 security_groups=[self.sec_grp_creator.sec_grp_settings.name])]
668 router_settings = RouterConfig(
669 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
670 port_settings=port_settings)
671 self.router_creator = create_router.OpenStackRouter(
672 self.os_creds, router_settings)
673 self.router_creator.create()