1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.config.network import PortConfig, NetworkConfig, PortConfigError
19 from snaps.config.router import RouterConfigError, RouterConfig
20 from snaps.config.security_group import SecurityGroupConfig
21 from snaps.openstack import create_network
22 from snaps.openstack import create_router
23 from snaps.openstack.create_network import OpenStackNetwork
24 from snaps.openstack.create_router import RouterSettings, OpenStackRouter
25 from snaps.openstack.create_security_group import OpenStackSecurityGroup
26 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
27 from snaps.openstack.utils import neutron_utils, settings_utils
29 __author__ = 'mmakati'
31 cidr1 = '10.200.201.0/24'
32 cidr2 = '10.200.202.0/24'
33 static_gateway_ip1 = '10.200.201.1'
34 static_gateway_ip2 = '10.200.202.1'
37 class RouterSettingsUnitTests(unittest.TestCase):
39 Class for testing the RouterSettings class
42 def test_no_params(self):
43 with self.assertRaises(RouterConfigError):
46 def test_empty_config(self):
47 with self.assertRaises(RouterConfigError):
48 RouterSettings(**dict())
50 def test_name_only(self):
51 settings = RouterSettings(name='foo')
52 self.assertEqual('foo', settings.name)
53 self.assertIsNone(settings.project_name)
54 self.assertIsNone(settings.external_gateway)
55 self.assertTrue(settings.admin_state_up)
56 self.assertIsNone(settings.enable_snat)
57 self.assertIsNotNone(settings.internal_subnets)
58 self.assertTrue(isinstance(settings.internal_subnets, list))
59 self.assertEqual(0, len(settings.internal_subnets))
60 self.assertIsNotNone(settings.port_settings)
61 self.assertTrue(isinstance(settings.port_settings, list))
62 self.assertEqual(0, len(settings.port_settings))
64 def test_config_with_name_only(self):
65 settings = RouterSettings(**{'name': 'foo'})
66 self.assertEqual('foo', settings.name)
67 self.assertIsNone(settings.project_name)
68 self.assertIsNone(settings.external_gateway)
69 self.assertTrue(settings.admin_state_up)
70 self.assertIsNone(settings.enable_snat)
71 self.assertIsNotNone(settings.internal_subnets)
72 self.assertTrue(isinstance(settings.internal_subnets, list))
73 self.assertEqual(0, len(settings.internal_subnets))
74 self.assertIsNotNone(settings.port_settings)
75 self.assertTrue(isinstance(settings.port_settings, list))
76 self.assertEqual(0, len(settings.port_settings))
79 port_settings = PortConfig(name='foo', network_name='bar')
80 settings = RouterSettings(
81 name='foo', project_name='bar', external_gateway='foo_gateway',
82 admin_state_up=True, enable_snat=False,
83 internal_subnets=['10.0.0.1/24'], interfaces=[port_settings])
84 self.assertEqual('foo', settings.name)
85 self.assertEqual('bar', settings.project_name)
86 self.assertEqual('foo_gateway', settings.external_gateway)
87 self.assertTrue(settings.admin_state_up)
88 self.assertFalse(settings.enable_snat)
89 self.assertIsNotNone(settings.internal_subnets)
90 self.assertTrue(isinstance(settings.internal_subnets, list))
91 self.assertEqual(1, len(settings.internal_subnets))
92 self.assertEqual(['10.0.0.1/24'], settings.internal_subnets)
93 self.assertEqual([port_settings], settings.port_settings)
95 def test_config_all(self):
96 settings = RouterSettings(
97 **{'name': 'foo', 'project_name': 'bar',
98 'external_gateway': 'foo_gateway', 'admin_state_up': True,
99 'enable_snat': False, 'internal_subnets': ['10.0.0.1/24'],
101 [{'port': {'name': 'foo-port',
102 'network_name': 'bar-net'}}]})
103 self.assertEqual('foo', settings.name)
104 self.assertEqual('bar', settings.project_name)
105 self.assertEqual('foo_gateway', settings.external_gateway)
106 self.assertTrue(settings.admin_state_up)
107 self.assertFalse(settings.enable_snat)
108 self.assertIsNotNone(settings.internal_subnets)
109 self.assertTrue(isinstance(settings.internal_subnets, list))
110 self.assertEqual(1, len(settings.internal_subnets))
111 self.assertEqual(['10.0.0.1/24'], settings.internal_subnets)
113 [PortConfig(**{'name': 'foo-port', 'network_name': 'bar-net'})],
114 settings.port_settings)
117 class CreateRouterSuccessTests(OSIntegrationTestCase):
119 Class for testing routers with various positive scenarios expected to
125 Initializes objects used for router testing
127 super(self.__class__, self).__start__()
129 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
130 self.router_creator = None
131 self.network_creator1 = None
132 self.network_creator2 = None
133 self.neutron = neutron_utils.neutron_client(self.os_creds)
137 Cleans the remote OpenStack objects used for router testing
139 if self.router_creator:
140 self.router_creator.clean()
142 if self.network_creator1:
143 self.network_creator1.clean()
145 if self.network_creator2:
146 self.network_creator2.clean()
148 super(self.__class__, self).__clean__()
150 def test_create_router_vanilla(self):
152 Test creation of a most basic router with minimal options.
154 router_settings = RouterConfig(
155 name=self.guid + '-pub-router', external_gateway=self.ext_net_name)
157 self.router_creator = create_router.OpenStackRouter(
158 self.os_creds, router_settings)
159 self.router_creator.create()
161 router = neutron_utils.get_router(
162 self.neutron, self.keystone, router_settings=router_settings,
163 project_name=self.os_creds.project_name)
164 self.assertIsNotNone(router)
166 self.assertEqual(self.router_creator.get_router(), router)
168 self.check_router_recreation(router, router_settings)
170 def test_create_router_admin_user_to_new_project(self):
172 Test creation of a most basic router with the admin user pointing
175 router_settings = RouterConfig(
176 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
177 project_name=self.os_creds.project_name)
179 self.router_creator = create_router.OpenStackRouter(
180 self.admin_os_creds, router_settings)
181 self.router_creator.create()
183 router = neutron_utils.get_router(
184 self.neutron, self.keystone, router_settings=router_settings,
185 project_name=self.os_creds.project_name)
186 self.assertIsNotNone(router)
188 self.assertEqual(self.router_creator.get_router().id, router.id)
190 self.check_router_recreation(router, router_settings)
192 def test_create_router_new_user_as_admin_project(self):
194 Test creation of a most basic router with the new user pointing
195 to the admin project.
197 router_settings = RouterConfig(
198 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
199 project_name=self.os_creds.project_name)
201 self.router_creator = create_router.OpenStackRouter(
202 self.admin_os_creds, router_settings)
203 self.router_creator.create()
205 router = neutron_utils.get_router(
206 self.neutron, self.keystone, router_settings=router_settings,
207 project_name=self.os_creds.project_name)
208 self.assertIsNotNone(router)
210 self.assertEqual(self.router_creator.get_router().id, router.id)
212 self.check_router_recreation(router, router_settings)
214 def test_create_delete_router(self):
216 Test that clean() will not raise an exception if the router is deleted
219 self.router_settings = RouterConfig(
220 name=self.guid + '-pub-router', external_gateway=self.ext_net_name)
222 self.router_creator = create_router.OpenStackRouter(
223 self.os_creds, self.router_settings)
224 created_router = self.router_creator.create()
225 self.assertIsNotNone(created_router)
226 retrieved_router = neutron_utils.get_router(
227 self.neutron, self.keystone, router_settings=self.router_settings,
228 project_name=self.os_creds.project_name)
229 self.assertIsNotNone(retrieved_router)
231 neutron_utils.delete_router(self.neutron, created_router)
233 retrieved_router = neutron_utils.get_router(
234 self.neutron, self.keystone, router_settings=self.router_settings,
235 project_name=self.os_creds.project_name)
236 self.assertIsNone(retrieved_router)
238 # Should not raise an exception
239 self.router_creator.clean()
241 def test_create_router_admin_state_false(self):
243 Test creation of a basic router with admin state down.
245 router_settings = RouterConfig(
246 name=self.guid + '-pub-router', admin_state_up=False)
248 self.router_creator = create_router.OpenStackRouter(self.os_creds,
250 self.router_creator.create()
252 router = neutron_utils.get_router(
253 self.neutron, self.keystone, router_settings=router_settings,
254 project_name=self.os_creds.project_name)
255 self.assertIsNotNone(router)
257 self.assertEqual(self.router_creator.get_router(), router)
259 self.check_router_recreation(router, router_settings)
261 def test_create_router_admin_state_True(self):
263 Test creation of a basic router with admin state Up.
265 router_settings = RouterConfig(
266 name=self.guid + '-pub-router', admin_state_up=True)
268 self.router_creator = create_router.OpenStackRouter(
269 self.os_creds, router_settings)
270 self.router_creator.create()
272 router = neutron_utils.get_router(
273 self.neutron, self.keystone, router_settings=router_settings,
274 project_name=self.os_creds.project_name)
275 self.assertIsNotNone(router)
277 self.assertEqual(self.router_creator.get_router(), router)
279 self.check_router_recreation(router, router_settings)
281 def test_create_router_private_network(self):
283 Test creation of a router connected with two private networks and no
286 network_settings1 = NetworkConfig(
287 name=self.guid + '-pub-net1',
289 create_network.SubnetConfig(
290 cidr=cidr1, name=self.guid + '-pub-subnet1',
291 gateway_ip=static_gateway_ip1)])
292 network_settings2 = NetworkConfig(
293 name=self.guid + '-pub-net2',
295 create_network.SubnetConfig(
296 cidr=cidr2, name=self.guid + '-pub-subnet2',
297 gateway_ip=static_gateway_ip2)])
299 self.network_creator1 = OpenStackNetwork(self.os_creds,
301 self.network_creator2 = OpenStackNetwork(self.os_creds,
304 self.network_creator1.create()
305 self.network_creator2.create()
308 create_network.PortConfig(
309 name=self.guid + '-port1',
312 network_settings1.subnet_settings[0].name,
313 'ip': static_gateway_ip1
315 network_name=network_settings1.name),
316 create_network.PortConfig(
317 name=self.guid + '-port2',
319 'subnet_name': network_settings2.subnet_settings[0].name,
320 'ip': static_gateway_ip2
322 network_name=network_settings2.name)]
324 router_settings = RouterConfig(
325 name=self.guid + '-pub-router', port_settings=port_settings)
326 self.router_creator = create_router.OpenStackRouter(
327 self.os_creds, router_settings)
328 self.router_creator.create()
330 router = neutron_utils.get_router(
331 self.neutron, self.keystone, router_settings=router_settings,
332 project_name=self.os_creds.project_name)
334 self.assertEqual(router, self.router_creator.get_router())
336 # Instantiate second identical creator to ensure a second router
337 # has not been created
338 router_creator2 = create_router.OpenStackRouter(
339 self.os_creds, router_settings)
340 router2 = router_creator2.create()
341 self.assertIsNotNone(self.router_creator.get_router(), router2)
343 self.check_router_recreation(router2, router_settings)
345 def test_create_router_external_network(self):
347 Test creation of a router connected to an external network and a
350 network_settings = NetworkConfig(
351 name=self.guid + '-pub-net1',
353 create_network.SubnetConfig(
354 cidr=cidr1, name=self.guid + '-pub-subnet1',
355 gateway_ip=static_gateway_ip1)])
356 self.network_creator1 = OpenStackNetwork(self.os_creds,
358 self.network_creator1.create()
361 create_network.PortConfig(
362 name=self.guid + '-port1',
364 'subnet_name': network_settings.subnet_settings[0].name,
365 'ip': static_gateway_ip1}],
366 network_name=network_settings.name)]
368 router_settings = RouterConfig(
369 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
370 port_settings=port_settings)
371 self.router_creator = create_router.OpenStackRouter(
372 self.os_creds, router_settings)
373 self.router_creator.create()
375 router = neutron_utils.get_router(
376 self.neutron, self.keystone, router_settings=router_settings,
377 project_name=self.os_creds.project_name)
379 self.assertEquals(router, self.router_creator.get_router())
381 self.check_router_recreation(router, router_settings)
383 def check_router_recreation(self, router, orig_settings):
385 Validates the derived RouterConfig with the original
386 :param router: the Router domain object to test
387 :param orig_settings: the original RouterConfig object that was
388 responsible for creating the router
389 :return: the derived RouterConfig object
391 derived_settings = settings_utils.create_router_config(
392 self.neutron, router)
393 self.assertIsNotNone(derived_settings)
395 orig_settings.enable_snat, derived_settings.enable_snat)
396 self.assertEqual(orig_settings.external_gateway,
397 derived_settings.external_gateway)
398 self.assertEqual(orig_settings.name, derived_settings.name)
399 self.assertEqual(orig_settings.internal_subnets,
400 derived_settings.internal_subnets)
402 if orig_settings.external_gateway:
403 self.assertEqual(len(orig_settings.port_settings),
404 len(derived_settings.port_settings))
406 self.assertEqual(len(orig_settings.port_settings),
407 len(derived_settings.port_settings))
409 if len(orig_settings.port_settings) > 0:
410 self.assertEqual(orig_settings.port_settings[0].name,
411 derived_settings.port_settings[0].name)
413 if len(orig_settings.port_settings) > 1:
414 self.assertEqual(orig_settings.port_settings[1].name,
415 derived_settings.port_settings[1].name)
417 return derived_settings
420 class CreateRouterNegativeTests(OSIntegrationTestCase):
422 Class for testing routers with various negative scenarios expected to fail.
427 Initializes objects used for router testing
429 super(self.__class__, self).__start__()
431 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
432 self.network_creator = None
433 self.router_creator = None
437 Cleans the remote OpenStack objects used for router testing
439 if self.router_creator:
440 self.router_creator.clean()
442 if self.network_creator:
443 self.network_creator.clean()
445 super(self.__class__, self).__clean__()
447 def test_create_router_noname(self):
449 Test creating a router without a name.
451 with self.assertRaises(RouterConfigError):
452 router_settings = RouterConfig(
453 name=None, external_gateway=self.ext_net_name)
454 self.router_creator = create_router.OpenStackRouter(
455 self.os_creds, router_settings)
456 self.router_creator.create()
458 def test_create_router_invalid_gateway_name(self):
460 Test creating a router without a valid network gateway name.
462 with self.assertRaises(RouterConfigError):
463 router_settings = RouterConfig(
464 name=self.guid + '-pub-router',
465 external_gateway="Invalid_name")
466 self.router_creator = create_router.OpenStackRouter(
467 self.os_creds, router_settings)
468 self.router_creator.create()
470 def test_create_router_admin_ports(self):
472 Test creation of a router with ports to subnets owned by the admin
475 network_settings = NetworkConfig(
476 name=self.guid + '-pub-net1',
478 create_network.SubnetConfig(
479 cidr=cidr1, name=self.guid + '-pub-subnet1',
480 gateway_ip=static_gateway_ip1)])
481 self.network_creator = OpenStackNetwork(
482 self.admin_os_creds, network_settings)
483 self.network_creator.create()
486 create_network.PortConfig(
487 name=self.guid + '-port1',
489 'subnet_name': network_settings.subnet_settings[0].name,
490 'ip': static_gateway_ip1}],
491 network_name=network_settings.name)]
493 router_settings = RouterConfig(
494 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
495 port_settings=port_settings)
496 self.router_creator = create_router.OpenStackRouter(
497 self.os_creds, router_settings)
499 with self.assertRaises(PortConfigError):
500 self.router_creator.create()
503 class CreateMultipleRouterTests(OSIntegrationTestCase):
505 Test for the OpenStackRouter class and how it interacts with routers
506 groups within other projects with the same name
511 Initializes objects used for router testing
513 super(self.__class__, self).__start__()
515 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
516 self.admin_router_creator = None
517 self.proj_router_creator = None
518 self.neutron = neutron_utils.neutron_client(self.os_creds)
520 network_settings = NetworkConfig(
521 name=self.guid + '-pub-net', shared=True,
523 create_network.SubnetConfig(
524 cidr=cidr1, name=self.guid + '-pub-subnet',
525 gateway_ip=static_gateway_ip1)])
527 self.network_creator = OpenStackNetwork(
528 self.admin_os_creds, network_settings)
529 self.network_creator.create()
533 Cleans the remote OpenStack objects used for router testing
535 if self.admin_router_creator:
536 self.admin_router_creator.clean()
538 if self.proj_router_creator:
539 self.proj_router_creator.clean()
541 if self.network_creator:
542 self.network_creator.clean()
544 super(self.__class__, self).__clean__()
546 def test_router_same_name_diff_proj(self):
548 Tests the creation of an OpenStackNetwork with the same name
549 within a different project/tenant when not configured but implied by
554 router_config = RouterConfig(name=self.guid + '-router')
555 self.admin_router_creator = OpenStackRouter(
556 self.admin_os_creds, router_config)
557 self.admin_router_creator.create()
559 self.proj_router_creator = OpenStackRouter(
560 self.os_creds, router_config)
561 self.proj_router_creator.create()
564 self.admin_router_creator.get_router().id,
565 self.proj_router_creator.get_router().id)
567 admin_creator2 = OpenStackRouter(
568 self.admin_os_creds, router_config)
569 admin_creator2.create()
571 self.admin_router_creator.get_router(),
572 admin_creator2.get_router())
574 proj_creator2 = OpenStackRouter(self.os_creds, router_config)
575 proj_creator2.create()
576 self.assertEqual(self.proj_router_creator.get_router(),
577 proj_creator2.get_router())
579 def test_router_create_by_admin_to_different_project(self):
581 Tests the creation of an OpenStackRouter by the admin user and
582 initialize again with tenant credentials.
586 admin_router_config = RouterConfig(
587 name=self.guid + '-router',
588 project_name=self.os_creds.project_name)
590 self.admin_router_creator = OpenStackRouter(
591 self.admin_os_creds, admin_router_config)
592 self.admin_router_creator.create()
594 proj_router_config = RouterConfig(
595 name=self.guid + '-router',
596 project_name=self.os_creds.project_name)
598 self.proj_router_creator = OpenStackRouter(
599 self.os_creds, proj_router_config)
600 self.proj_router_creator.create()
603 self.admin_router_creator.get_router().id,
604 self.proj_router_creator.get_router().id)
607 class CreateRouterSecurityGroupTests(OSIntegrationTestCase):
609 Class for testing routers with ports containing security groups
614 Initializes objects used for router testing
616 super(self.__class__, self).__start__()
618 self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
619 self.router_creator = None
620 self.network_creator = None
622 self.sec_grp_creator = OpenStackSecurityGroup(
623 self.os_creds, SecurityGroupConfig(name=self.guid + '-sec_grp'))
624 self.sec_grp_creator.create()
626 self.neutron = neutron_utils.neutron_client(self.os_creds)
630 Cleans the remote OpenStack objects used for router testing
632 if self.router_creator:
633 self.router_creator.clean()
635 if self.network_creator:
636 self.network_creator.clean()
638 if self.sec_grp_creator:
639 self.sec_grp_creator.clean()
641 super(self.__class__, self).__clean__()
643 def test_create_router_secure_port(self):
645 Test creation of a router with a port that has a security group.
647 network_settings = NetworkConfig(
648 name=self.guid + '-pub-net1',
650 create_network.SubnetConfig(
651 cidr=cidr1, name=self.guid + '-pub-subnet1')])
652 self.network_creator = OpenStackNetwork(
653 self.os_creds, network_settings)
654 self.network_creator.create()
657 create_network.PortConfig(
658 name=self.guid + '-port1',
660 'subnet_name': network_settings.subnet_settings[0].name,
661 'ip': static_gateway_ip1}],
662 network_name=network_settings.name,
663 security_groups=[self.sec_grp_creator.sec_grp_settings.name])]
665 router_settings = RouterConfig(
666 name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
667 port_settings=port_settings)
668 self.router_creator = create_router.OpenStackRouter(
669 self.os_creds, router_settings)
670 self.router_creator.create()