1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.config.network import PortConfig, NetworkConfig, PortConfigError
19 from snaps.config.router import RouterConfigError, RouterConfig
20 from snaps.config.security_group import SecurityGroupConfig
21 from snaps.openstack import create_network
22 from snaps.openstack import create_router
23 from snaps.openstack.create_network import OpenStackNetwork
24 from snaps.openstack.create_router import (RouterSettings, OpenStackRouter,
26 from snaps.openstack.create_security_group import OpenStackSecurityGroup
27 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
28 from snaps.openstack.utils import neutron_utils, settings_utils, keystone_utils
30 __author__ = 'mmakati'
32 cidr1 = '10.200.201.0/24'
33 cidr2 = '10.200.202.0/24'
34 static_gateway_ip1 = '10.200.201.1'
35 static_gateway_ip2 = '10.200.202.1'
38 class RouterSettingsUnitTests(unittest.TestCase):
40 Class for testing the RouterSettings class
43 def test_no_params(self):
44 with self.assertRaises(RouterConfigError):
47 def test_empty_config(self):
48 with self.assertRaises(RouterConfigError):
49 RouterSettings(**dict())
51 def test_name_only(self):
52 settings = RouterSettings(name='foo')
53 self.assertEqual('foo', settings.name)
54 self.assertIsNone(settings.project_name)
55 self.assertIsNone(settings.external_gateway)
56 self.assertTrue(settings.admin_state_up)
57 self.assertIsNone(settings.enable_snat)
58 self.assertIsNotNone(settings.internal_subnets)
59 self.assertTrue(isinstance(settings.internal_subnets, list))
60 self.assertEqual(0, len(settings.internal_subnets))
61 self.assertIsNotNone(settings.port_settings)
62 self.assertTrue(isinstance(settings.port_settings, list))
63 self.assertEqual(0, len(settings.port_settings))
65 def test_config_with_name_only(self):
66 settings = RouterSettings(**{'name': 'foo'})
67 self.assertEqual('foo', settings.name)
68 self.assertIsNone(settings.project_name)
69 self.assertIsNone(settings.external_gateway)
70 self.assertTrue(settings.admin_state_up)
71 self.assertIsNone(settings.enable_snat)
72 self.assertIsNotNone(settings.internal_subnets)
73 self.assertTrue(isinstance(settings.internal_subnets, list))
74 self.assertEqual(0, len(settings.internal_subnets))
75 self.assertIsNotNone(settings.port_settings)
76 self.assertTrue(isinstance(settings.port_settings, list))
77 self.assertEqual(0, len(settings.port_settings))
80 port_settings = PortConfig(name='foo', network_name='bar')
81 settings = RouterSettings(
82 name='foo', project_name='bar', external_gateway='foo_gateway',
83 admin_state_up=True, enable_snat=False,
84 internal_subnets=['10.0.0.1/24'], interfaces=[port_settings])
85 self.assertEqual('foo', settings.name)
86 self.assertEqual('bar', settings.project_name)
87 self.assertEqual('foo_gateway', settings.external_gateway)
88 self.assertTrue(settings.admin_state_up)
89 self.assertFalse(settings.enable_snat)
90 self.assertIsNotNone(settings.internal_subnets)
91 self.assertTrue(isinstance(settings.internal_subnets, list))
92 self.assertEqual(1, len(settings.internal_subnets))
93 self.assertEqual(['10.0.0.1/24'], settings.internal_subnets)
94 self.assertEqual([port_settings], settings.port_settings)
96 def test_config_all(self):
97 settings = RouterSettings(
98 **{'name': 'foo', 'project_name': 'bar',
99 'external_gateway': 'foo_gateway', 'admin_state_up': True,
100 'enable_snat': False, 'internal_subnets': ['10.0.0.1/24'],
102 [{'port': {'name': 'foo-port',
103 'network_name': 'bar-net'}}]})
104 self.assertEqual('foo', settings.name)
105 self.assertEqual('bar', settings.project_name)
106 self.assertEqual('foo_gateway', settings.external_gateway)
107 self.assertTrue(settings.admin_state_up)
108 self.assertFalse(settings.enable_snat)
109 self.assertIsNotNone(settings.internal_subnets)
110 self.assertTrue(isinstance(settings.internal_subnets, list))
111 self.assertEqual(1, len(settings.internal_subnets))
112 self.assertEqual(['10.0.0.1/24'], settings.internal_subnets)
114 [PortConfig(**{'name': 'foo-port', 'network_name': 'bar-net'})],
115 settings.port_settings)
118 class CreateRouterSuccessTests(OSIntegrationTestCase):
120 Class for testing routers with various positive scenarios expected to
126 Initializes objects used for router testing
128 super(self.__class__, self).__start__()
130 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
131 self.router_creator = None
132 self.network_creator1 = None
133 self.network_creator2 = None
134 self.neutron = neutron_utils.neutron_client(
135 self.os_creds, self.os_session)
139 Cleans the remote OpenStack objects used for router testing
141 if self.router_creator:
142 self.router_creator.clean()
144 if self.network_creator1:
145 self.network_creator1.clean()
147 if self.network_creator2:
148 self.network_creator2.clean()
150 super(self.__class__, self).__clean__()
152 def test_create_router_vanilla(self):
154 Test creation of a most basic router with minimal options.
156 router_settings = RouterConfig(
157 name=self.guid + '-pub-router', external_gateway=self.ext_net_name)
159 self.router_creator = create_router.OpenStackRouter(
160 self.os_creds, router_settings)
161 self.router_creator.create()
163 router = neutron_utils.get_router(
164 self.neutron, self.keystone, router_settings=router_settings,
165 project_name=self.os_creds.project_name)
166 self.assertIsNotNone(router)
168 self.assertEqual(self.router_creator.get_router(), router)
170 self.check_router_recreation(router, router_settings)
172 def test_create_router_admin_user_to_new_project(self):
174 Test creation of a most basic router with the admin user pointing
177 router_settings = RouterConfig(
178 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
179 project_name=self.os_creds.project_name)
181 self.router_creator = create_router.OpenStackRouter(
182 self.admin_os_creds, router_settings)
183 self.router_creator.create()
185 router = neutron_utils.get_router(
186 self.neutron, self.keystone, router_settings=router_settings,
187 project_name=self.os_creds.project_name)
188 self.assertIsNotNone(router)
190 self.assertEqual(self.router_creator.get_router().id, router.id)
192 self.check_router_recreation(router, router_settings)
194 def test_create_router_new_user_as_admin_project(self):
196 Test creation of a most basic router with the new user pointing
197 to the admin project.
199 router_settings = RouterConfig(
200 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
201 project_name=self.os_creds.project_name)
203 self.router_creator = create_router.OpenStackRouter(
204 self.admin_os_creds, router_settings)
205 self.router_creator.create()
207 router = neutron_utils.get_router(
208 self.neutron, self.keystone, router_settings=router_settings,
209 project_name=self.os_creds.project_name)
210 self.assertIsNotNone(router)
212 self.assertEqual(self.router_creator.get_router().id, router.id)
214 self.check_router_recreation(router, router_settings)
216 def test_create_delete_router(self):
218 Test that clean() will not raise an exception if the router is deleted
221 self.router_settings = RouterConfig(
222 name=self.guid + '-pub-router', external_gateway=self.ext_net_name)
224 self.router_creator = create_router.OpenStackRouter(
225 self.os_creds, self.router_settings)
226 created_router = self.router_creator.create()
227 self.assertIsNotNone(created_router)
228 retrieved_router = neutron_utils.get_router(
229 self.neutron, self.keystone, router_settings=self.router_settings,
230 project_name=self.os_creds.project_name)
231 self.assertIsNotNone(retrieved_router)
233 neutron_utils.delete_router(self.neutron, created_router)
235 retrieved_router = neutron_utils.get_router(
236 self.neutron, self.keystone, router_settings=self.router_settings,
237 project_name=self.os_creds.project_name)
238 self.assertIsNone(retrieved_router)
240 # Should not raise an exception
241 self.router_creator.clean()
243 def test_create_with_internal_sub(self):
245 Test internal_subnets works.
247 network_settings1 = NetworkConfig(
248 name=self.guid + '-pub-net1',
250 create_network.SubnetConfig(
251 cidr=cidr1, name=self.guid + '-pub-subnet1',
252 gateway_ip=static_gateway_ip1)])
253 self.network_creator1 = OpenStackNetwork(self.os_creds,
256 self.network_creator1.create()
257 self.router_settings = RouterConfig(
258 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
259 internal_subnets=[network_settings1.subnet_settings[0].name])
261 self.router_creator = create_router.OpenStackRouter(
262 self.os_creds, self.router_settings)
263 created_router = self.router_creator.create()
264 self.assertIsNotNone(created_router)
266 def test_create_with_invalid_internal_sub(self):
268 Test adding an internal subnet owned by admin which should fail.
270 network_settings1 = NetworkConfig(
271 name=self.guid + '-pub-net1',
273 create_network.SubnetConfig(
274 cidr=cidr1, name=self.guid + '-pub-subnet1',
275 gateway_ip=static_gateway_ip1)])
276 self.network_creator1 = OpenStackNetwork(self.admin_os_creds,
279 self.network_creator1.create()
280 self.router_settings = RouterConfig(
281 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
282 internal_subnets=[network_settings1.subnet_settings[0].name])
284 self.router_creator = create_router.OpenStackRouter(
285 self.os_creds, self.router_settings)
287 with self.assertRaises(RouterCreationError):
288 created_router = self.router_creator.create()
290 def test_create_router_admin_state_false(self):
292 Test creation of a basic router with admin state down.
294 router_settings = RouterConfig(
295 name=self.guid + '-pub-router', admin_state_up=False)
297 self.router_creator = create_router.OpenStackRouter(self.os_creds,
299 self.router_creator.create()
301 router = neutron_utils.get_router(
302 self.neutron, self.keystone, router_settings=router_settings,
303 project_name=self.os_creds.project_name)
304 self.assertIsNotNone(router)
306 self.assertEqual(self.router_creator.get_router(), router)
308 self.check_router_recreation(router, router_settings)
310 def test_create_router_admin_state_True(self):
312 Test creation of a basic router with admin state Up.
314 router_settings = RouterConfig(
315 name=self.guid + '-pub-router', admin_state_up=True)
317 self.router_creator = create_router.OpenStackRouter(
318 self.os_creds, router_settings)
319 self.router_creator.create()
321 router = neutron_utils.get_router(
322 self.neutron, self.keystone, router_settings=router_settings,
323 project_name=self.os_creds.project_name)
324 self.assertIsNotNone(router)
326 self.assertEqual(self.router_creator.get_router(), router)
328 self.check_router_recreation(router, router_settings)
330 def test_create_router_private_network(self):
332 Test creation of a router connected with two private networks and no
335 network_settings1 = NetworkConfig(
336 name=self.guid + '-pub-net1',
338 create_network.SubnetConfig(
339 cidr=cidr1, name=self.guid + '-pub-subnet1',
340 gateway_ip=static_gateway_ip1)])
341 network_settings2 = NetworkConfig(
342 name=self.guid + '-pub-net2',
344 create_network.SubnetConfig(
345 cidr=cidr2, name=self.guid + '-pub-subnet2',
346 gateway_ip=static_gateway_ip2)])
348 self.network_creator1 = OpenStackNetwork(self.os_creds,
350 self.network_creator2 = OpenStackNetwork(self.os_creds,
353 self.network_creator1.create()
354 self.network_creator2.create()
357 create_network.PortConfig(
358 name=self.guid + '-port1',
361 network_settings1.subnet_settings[0].name,
362 'ip': static_gateway_ip1
364 network_name=network_settings1.name),
365 create_network.PortConfig(
366 name=self.guid + '-port2',
368 'subnet_name': network_settings2.subnet_settings[0].name,
369 'ip': static_gateway_ip2
371 network_name=network_settings2.name)]
373 router_settings = RouterConfig(
374 name=self.guid + '-pub-router', port_settings=port_settings)
375 self.router_creator = create_router.OpenStackRouter(
376 self.os_creds, router_settings)
377 self.router_creator.create()
379 router = neutron_utils.get_router(
380 self.neutron, self.keystone, router_settings=router_settings,
381 project_name=self.os_creds.project_name)
383 self.assertEqual(router, self.router_creator.get_router())
385 # Instantiate second identical creator to ensure a second router
386 # has not been created
387 router_creator2 = create_router.OpenStackRouter(
388 self.os_creds, router_settings)
389 router2 = router_creator2.create()
390 self.assertIsNotNone(self.router_creator.get_router(), router2)
392 self.check_router_recreation(router2, router_settings)
394 def test_create_router_external_network(self):
396 Test creation of a router connected to an external network and a
399 network_settings = NetworkConfig(
400 name=self.guid + '-pub-net1',
402 create_network.SubnetConfig(
403 cidr=cidr1, name=self.guid + '-pub-subnet1',
404 gateway_ip=static_gateway_ip1)])
405 self.network_creator1 = OpenStackNetwork(self.os_creds,
407 self.network_creator1.create()
410 create_network.PortConfig(
411 name=self.guid + '-port1',
413 'subnet_name': network_settings.subnet_settings[0].name,
414 'ip': static_gateway_ip1}],
415 network_name=network_settings.name)]
417 router_settings = RouterConfig(
418 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
419 port_settings=port_settings)
420 self.router_creator = create_router.OpenStackRouter(
421 self.os_creds, router_settings)
422 self.router_creator.create()
424 router = neutron_utils.get_router(
425 self.neutron, self.keystone, router_settings=router_settings,
426 project_name=self.os_creds.project_name)
428 self.assertEquals(router, self.router_creator.get_router())
430 self.check_router_recreation(router, router_settings)
432 def test_create_router_with_ext_port(self):
434 Test creation of a router with a port to an external network as an
438 create_network.PortConfig(
439 name=self.guid + '-port1',
440 network_name=self.ext_net_name)]
442 router_settings = RouterConfig(
443 name=self.guid + '-pub-router', port_settings=port_settings)
444 self.router_creator = create_router.OpenStackRouter(
445 self.admin_os_creds, router_settings)
446 self.router_creator.create()
448 admin_neutron = neutron_utils.neutron_client(
449 self.admin_os_creds, self.admin_os_session)
450 admin_keystone = keystone_utils.keystone_client(
451 self.admin_os_creds, self.admin_os_session)
452 router = neutron_utils.get_router(
453 admin_neutron, admin_keystone, router_settings=router_settings,
454 project_name=self.admin_os_creds.project_name)
456 self.assertIsNotNone(router)
457 self.assertEquals(router, self.router_creator.get_router())
459 ext_net = neutron_utils.get_network(
460 admin_neutron, admin_keystone, network_name=self.ext_net_name)
462 self.assertIsNotNone(ext_net)
463 self.assertIsNotNone(router.port_subnets)
466 for port, subnets in router.port_subnets:
467 self.assertIsNotNone(subnets)
468 self.assertIsNotNone(port)
470 if ext_net.id == port.network_id:
472 for subnet in subnets:
473 self.assertIsNotNone(subnet)
474 self.assertEqual(ext_net.id, subnet.network_id)
475 self.assertTrue(id_found)
477 def check_router_recreation(self, router, orig_settings):
479 Validates the derived RouterConfig with the original
480 :param router: the Router domain object to test
481 :param orig_settings: the original RouterConfig object that was
482 responsible for creating the router
483 :return: the derived RouterConfig object
485 derived_settings = settings_utils.create_router_config(
486 self.neutron, router)
487 self.assertIsNotNone(derived_settings)
489 orig_settings.enable_snat, derived_settings.enable_snat)
490 self.assertEqual(orig_settings.external_gateway,
491 derived_settings.external_gateway)
492 self.assertEqual(orig_settings.name, derived_settings.name)
493 self.assertEqual(orig_settings.internal_subnets,
494 derived_settings.internal_subnets)
496 if orig_settings.external_gateway:
497 self.assertEqual(len(orig_settings.port_settings),
498 len(derived_settings.port_settings))
500 self.assertEqual(len(orig_settings.port_settings),
501 len(derived_settings.port_settings))
503 if len(orig_settings.port_settings) > 0:
504 self.assertEqual(orig_settings.port_settings[0].name,
505 derived_settings.port_settings[0].name)
507 if len(orig_settings.port_settings) > 1:
508 self.assertEqual(orig_settings.port_settings[1].name,
509 derived_settings.port_settings[1].name)
511 return derived_settings
514 class CreateRouterNegativeTests(OSIntegrationTestCase):
516 Class for testing routers with various negative scenarios expected to fail.
521 Initializes objects used for router testing
523 super(self.__class__, self).__start__()
525 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
526 self.network_creator = None
527 self.router_creator = None
531 Cleans the remote OpenStack objects used for router testing
533 if self.router_creator:
534 self.router_creator.clean()
536 if self.network_creator:
537 self.network_creator.clean()
539 super(self.__class__, self).__clean__()
541 def test_create_router_noname(self):
543 Test creating a router without a name.
545 with self.assertRaises(RouterConfigError):
546 router_settings = RouterConfig(
547 name=None, external_gateway=self.ext_net_name)
548 self.router_creator = create_router.OpenStackRouter(
549 self.os_creds, router_settings)
550 self.router_creator.create()
552 def test_create_router_invalid_gateway_name(self):
554 Test creating a router without a valid network gateway name.
556 with self.assertRaises(RouterConfigError):
557 router_settings = RouterConfig(
558 name=self.guid + '-pub-router',
559 external_gateway="Invalid_name")
560 self.router_creator = create_router.OpenStackRouter(
561 self.os_creds, router_settings)
562 self.router_creator.create()
564 def test_create_router_admin_ports(self):
566 Test creation of a router with ports to subnets owned by the admin
569 network_settings = NetworkConfig(
570 name=self.guid + '-pub-net1',
572 create_network.SubnetConfig(
573 cidr=cidr1, name=self.guid + '-pub-subnet1',
574 gateway_ip=static_gateway_ip1)])
575 self.network_creator = OpenStackNetwork(
576 self.admin_os_creds, network_settings)
577 self.network_creator.create()
580 create_network.PortConfig(
581 name=self.guid + '-port1',
583 'subnet_name': network_settings.subnet_settings[0].name,
584 'ip': static_gateway_ip1}],
585 network_name=network_settings.name)]
587 router_settings = RouterConfig(
588 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
589 port_settings=port_settings)
590 self.router_creator = create_router.OpenStackRouter(
591 self.os_creds, router_settings)
593 with self.assertRaises(PortConfigError):
594 self.router_creator.create()
597 class CreateMultipleRouterTests(OSIntegrationTestCase):
599 Test for the OpenStackRouter class and how it interacts with routers
600 groups within other projects with the same name
605 Initializes objects used for router testing
607 super(self.__class__, self).__start__()
609 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
610 self.admin_router_creator = None
611 self.proj_router_creator = None
612 self.neutron = neutron_utils.neutron_client(
613 self.os_creds, self.os_session)
615 network_settings = NetworkConfig(
616 name=self.guid + '-pub-net', shared=True,
618 create_network.SubnetConfig(
619 cidr=cidr1, name=self.guid + '-pub-subnet',
620 gateway_ip=static_gateway_ip1)])
622 self.network_creator = OpenStackNetwork(
623 self.admin_os_creds, network_settings)
624 self.network_creator.create()
628 Cleans the remote OpenStack objects used for router testing
630 if self.admin_router_creator:
631 self.admin_router_creator.clean()
633 if self.proj_router_creator:
634 self.proj_router_creator.clean()
636 if self.network_creator:
637 self.network_creator.clean()
639 super(self.__class__, self).__clean__()
641 def test_router_same_name_diff_proj(self):
643 Tests the creation of an OpenStackNetwork with the same name
644 within a different project/tenant when not configured but implied by
649 router_config = RouterConfig(name=self.guid + '-router')
650 self.admin_router_creator = OpenStackRouter(
651 self.admin_os_creds, router_config)
652 self.admin_router_creator.create()
654 self.proj_router_creator = OpenStackRouter(
655 self.os_creds, router_config)
656 self.proj_router_creator.create()
659 self.admin_router_creator.get_router().id,
660 self.proj_router_creator.get_router().id)
662 admin_creator2 = OpenStackRouter(
663 self.admin_os_creds, router_config)
664 admin_creator2.create()
666 self.admin_router_creator.get_router(),
667 admin_creator2.get_router())
669 proj_creator2 = OpenStackRouter(self.os_creds, router_config)
670 proj_creator2.create()
671 self.assertEqual(self.proj_router_creator.get_router(),
672 proj_creator2.get_router())
674 def test_router_create_by_admin_to_different_project(self):
676 Tests the creation of an OpenStackRouter by the admin user and
677 initialize again with tenant credentials.
681 admin_router_config = RouterConfig(
682 name=self.guid + '-router',
683 project_name=self.os_creds.project_name)
685 self.admin_router_creator = OpenStackRouter(
686 self.admin_os_creds, admin_router_config)
687 self.admin_router_creator.create()
689 proj_router_config = RouterConfig(
690 name=self.guid + '-router',
691 project_name=self.os_creds.project_name)
693 self.proj_router_creator = OpenStackRouter(
694 self.os_creds, proj_router_config)
695 self.proj_router_creator.create()
698 self.admin_router_creator.get_router().id,
699 self.proj_router_creator.get_router().id)
702 class CreateRouterSecurityGroupTests(OSIntegrationTestCase):
704 Class for testing routers with ports containing security groups
709 Initializes objects used for router testing
711 super(self.__class__, self).__start__()
713 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
714 self.router_creator = None
715 self.network_creator = None
717 self.sec_grp_creator = OpenStackSecurityGroup(
718 self.os_creds, SecurityGroupConfig(name=self.guid + '-sec_grp'))
719 self.sec_grp_creator.create()
721 self.neutron = neutron_utils.neutron_client(
722 self.os_creds, self.os_session)
726 Cleans the remote OpenStack objects used for router testing
728 if self.router_creator:
729 self.router_creator.clean()
731 if self.network_creator:
732 self.network_creator.clean()
734 if self.sec_grp_creator:
735 self.sec_grp_creator.clean()
737 super(self.__class__, self).__clean__()
739 def test_create_router_secure_port(self):
741 Test creation of a router with a port that has a security group.
743 network_settings = NetworkConfig(
744 name=self.guid + '-pub-net1',
746 create_network.SubnetConfig(
747 cidr=cidr1, name=self.guid + '-pub-subnet1')])
748 self.network_creator = OpenStackNetwork(
749 self.os_creds, network_settings)
750 self.network_creator.create()
753 create_network.PortConfig(
754 name=self.guid + '-port1',
756 'subnet_name': network_settings.subnet_settings[0].name,
757 'ip': static_gateway_ip1}],
758 network_name=network_settings.name,
759 security_groups=[self.sec_grp_creator.sec_grp_settings.name])]
761 router_settings = RouterConfig(
762 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
763 port_settings=port_settings)
764 self.router_creator = create_router.OpenStackRouter(
765 self.os_creds, router_settings)
766 self.router_creator.create()