1 # Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
18 from snaps.openstack.create_network import OpenStackNetwork, NetworkSettings, SubnetSettings, PortSettings
19 from snaps.openstack import create_router
20 from snaps.openstack.tests import openstack_tests
21 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase, OSComponentTestCase
22 from snaps.openstack.utils import neutron_utils
23 from snaps.openstack.utils.tests import neutron_utils_tests
25 __author__ = 'spisarski'
28 class NetworkSettingsUnitTests(unittest.TestCase):
30 Tests the construction of the NetworkSettings class
33 def test_no_params(self):
34 with self.assertRaises(Exception):
37 def test_empty_config(self):
38 with self.assertRaises(Exception):
39 NetworkSettings(config=dict())
41 def test_name_only(self):
42 settings = NetworkSettings(name='foo')
43 self.assertEquals('foo', settings.name)
44 self.assertTrue(settings.admin_state_up)
45 self.assertIsNone(settings.shared)
46 self.assertIsNone(settings.project_name)
47 self.assertFalse(settings.external)
48 self.assertIsNone(settings.network_type)
49 self.assertEquals(0, len(settings.subnet_settings))
51 def test_config_with_name_only(self):
52 settings = NetworkSettings(config={'name': 'foo'})
53 self.assertEquals('foo', settings.name)
54 self.assertTrue(settings.admin_state_up)
55 self.assertIsNone(settings.shared)
56 self.assertIsNone(settings.project_name)
57 self.assertFalse(settings.external)
58 self.assertIsNone(settings.network_type)
59 self.assertEquals(0, len(settings.subnet_settings))
62 sub_settings = SubnetSettings(name='foo-subnet', cidr='10.0.0.0/24')
63 settings = NetworkSettings(name='foo', admin_state_up=False, shared=True, project_name='bar', external=True,
64 network_type='flat', physical_network='phy', subnet_settings=[sub_settings])
65 self.assertEquals('foo', settings.name)
66 self.assertFalse(settings.admin_state_up)
67 self.assertTrue(settings.shared)
68 self.assertEquals('bar', settings.project_name)
69 self.assertTrue(settings.external)
70 self.assertEquals('flat', settings.network_type)
71 self.assertEquals('phy', settings.physical_network)
72 self.assertEquals(1, len(settings.subnet_settings))
73 self.assertEquals('foo-subnet', settings.subnet_settings[0].name)
75 def test_config_all(self):
76 settings = NetworkSettings(config={'name': 'foo', 'admin_state_up': False, 'shared': True,
77 'project_name': 'bar', 'external': True, 'network_type': 'flat',
78 'physical_network': 'phy',
80 [{'subnet': {'name': 'foo-subnet', 'cidr': '10.0.0.0/24'}}]})
81 self.assertEquals('foo', settings.name)
82 self.assertFalse(settings.admin_state_up)
83 self.assertTrue(settings.shared)
84 self.assertEquals('bar', settings.project_name)
85 self.assertTrue(settings.external)
86 self.assertEquals('flat', settings.network_type)
87 self.assertEquals('phy', settings.physical_network)
88 self.assertEquals(1, len(settings.subnet_settings))
89 self.assertEquals('foo-subnet', settings.subnet_settings[0].name)
92 class SubnetSettingsUnitTests(unittest.TestCase):
94 Tests the construction of the SubnetSettings class
97 def test_no_params(self):
98 with self.assertRaises(Exception):
101 def test_empty_config(self):
102 with self.assertRaises(Exception):
103 SubnetSettings(config=dict())
105 def test_name_only(self):
106 with self.assertRaises(Exception):
107 SubnetSettings(name='foo')
109 def test_config_with_name_only(self):
110 with self.assertRaises(Exception):
111 SubnetSettings(config={'name': 'foo'})
113 def test_name_cidr_only(self):
114 settings = SubnetSettings(name='foo', cidr='10.0.0.0/24')
115 self.assertEquals('foo', settings.name)
116 self.assertEquals('10.0.0.0/24', settings.cidr)
117 self.assertEquals(4, settings.ip_version)
118 self.assertIsNone(settings.project_name)
119 self.assertIsNone(settings.start)
120 self.assertIsNone(settings.end)
121 self.assertIsNone(settings.enable_dhcp)
122 self.assertEquals(1, len(settings.dns_nameservers))
123 self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
124 self.assertIsNone(settings.host_routes)
125 self.assertIsNone(settings.destination)
126 self.assertIsNone(settings.nexthop)
127 self.assertIsNone(settings.ipv6_ra_mode)
128 self.assertIsNone(settings.ipv6_address_mode)
130 def test_config_with_name_cidr_only(self):
131 settings = SubnetSettings(config={'name': 'foo', 'cidr': '10.0.0.0/24'})
132 self.assertEquals('foo', settings.name)
133 self.assertEquals('10.0.0.0/24', settings.cidr)
134 self.assertEquals(4, settings.ip_version)
135 self.assertIsNone(settings.project_name)
136 self.assertIsNone(settings.start)
137 self.assertIsNone(settings.end)
138 self.assertIsNone(settings.gateway_ip)
139 self.assertIsNone(settings.enable_dhcp)
140 self.assertEquals(1, len(settings.dns_nameservers))
141 self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
142 self.assertIsNone(settings.host_routes)
143 self.assertIsNone(settings.destination)
144 self.assertIsNone(settings.nexthop)
145 self.assertIsNone(settings.ipv6_ra_mode)
146 self.assertIsNone(settings.ipv6_address_mode)
149 host_routes = {'destination': '0.0.0.0/0', 'nexthop': '123.456.78.9'}
150 settings = SubnetSettings(name='foo', cidr='10.0.0.0/24', ip_version=6, project_name='bar-project',
151 start='10.0.0.2', end='10.0.0.101', gateway_ip='10.0.0.1', enable_dhcp=False,
152 dns_nameservers=['8.8.8.8'], host_routes=[host_routes], destination='dest',
153 nexthop='hop', ipv6_ra_mode='dhcpv6-stateful', ipv6_address_mode='slaac')
154 self.assertEquals('foo', settings.name)
155 self.assertEquals('10.0.0.0/24', settings.cidr)
156 self.assertEquals(6, settings.ip_version)
157 self.assertEquals('bar-project', settings.project_name)
158 self.assertEquals('10.0.0.2', settings.start)
159 self.assertEquals('10.0.0.101', settings.end)
160 self.assertEquals('10.0.0.1', settings.gateway_ip)
161 self.assertEquals(False, settings.enable_dhcp)
162 self.assertEquals(1, len(settings.dns_nameservers))
163 self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
164 self.assertEquals(1, len(settings.host_routes))
165 self.assertEquals(host_routes, settings.host_routes[0])
166 self.assertEquals('dest', settings.destination)
167 self.assertEquals('hop', settings.nexthop)
168 self.assertEquals('dhcpv6-stateful', settings.ipv6_ra_mode)
169 self.assertEquals('slaac', settings.ipv6_address_mode)
171 def test_config_all(self):
172 host_routes = {'destination': '0.0.0.0/0', 'nexthop': '123.456.78.9'}
173 settings = SubnetSettings(config={'name': 'foo', 'cidr': '10.0.0.0/24', 'ip_version': 6,
174 'project_name': 'bar-project', 'start': '10.0.0.2', 'end': '10.0.0.101',
175 'gateway_ip': '10.0.0.1', 'enable_dhcp': False,
176 'dns_nameservers': ['8.8.8.8'], 'host_routes': [host_routes],
177 'destination': 'dest', 'nexthop': 'hop', 'ipv6_ra_mode': 'dhcpv6-stateful',
178 'ipv6_address_mode': 'slaac'})
179 self.assertEquals('foo', settings.name)
180 self.assertEquals('10.0.0.0/24', settings.cidr)
181 self.assertEquals(6, settings.ip_version)
182 self.assertEquals('bar-project', settings.project_name)
183 self.assertEquals('10.0.0.2', settings.start)
184 self.assertEquals('10.0.0.101', settings.end)
185 self.assertEquals('10.0.0.1', settings.gateway_ip)
186 self.assertEquals(False, settings.enable_dhcp)
187 self.assertEquals(1, len(settings.dns_nameservers))
188 self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
189 self.assertEquals(1, len(settings.host_routes))
190 self.assertEquals(host_routes, settings.host_routes[0])
191 self.assertEquals('dest', settings.destination)
192 self.assertEquals('hop', settings.nexthop)
193 self.assertEquals('dhcpv6-stateful', settings.ipv6_ra_mode)
194 self.assertEquals('slaac', settings.ipv6_address_mode)
197 class PortSettingsUnitTests(unittest.TestCase):
199 Tests the construction of the PortSettings class
202 def test_no_params(self):
203 with self.assertRaises(Exception):
206 def test_empty_config(self):
207 with self.assertRaises(Exception):
208 PortSettings(config=dict())
210 def test_name_only(self):
211 with self.assertRaises(Exception):
212 PortSettings(name='foo')
214 def test_config_name_only(self):
215 with self.assertRaises(Exception):
216 PortSettings(config={'name': 'foo'})
218 def test_name_netname_only(self):
219 settings = PortSettings(name='foo', network_name='bar')
220 self.assertEquals('foo', settings.name)
221 self.assertEquals('bar', settings.network_name)
222 self.assertTrue(settings.admin_state_up)
223 self.assertIsNone(settings.project_name)
224 self.assertIsNone(settings.mac_address)
225 self.assertIsNone(settings.ip_addrs)
226 self.assertIsNone(settings.fixed_ips)
227 self.assertIsNone(settings.security_groups)
228 self.assertIsNone(settings.allowed_address_pairs)
229 self.assertIsNone(settings.opt_value)
230 self.assertIsNone(settings.opt_name)
231 self.assertIsNone(settings.device_owner)
232 self.assertIsNone(settings.device_id)
234 def test_config_with_name_netname_only(self):
235 settings = PortSettings(config={'name': 'foo', 'network_name': 'bar'})
236 self.assertEquals('foo', settings.name)
237 self.assertEquals('bar', settings.network_name)
238 self.assertTrue(settings.admin_state_up)
239 self.assertIsNone(settings.project_name)
240 self.assertIsNone(settings.mac_address)
241 self.assertIsNone(settings.ip_addrs)
242 self.assertIsNone(settings.fixed_ips)
243 self.assertIsNone(settings.security_groups)
244 self.assertIsNone(settings.allowed_address_pairs)
245 self.assertIsNone(settings.opt_value)
246 self.assertIsNone(settings.opt_name)
247 self.assertIsNone(settings.device_owner)
248 self.assertIsNone(settings.device_id)
251 ip_addrs = [{'subnet_name', 'foo-sub', 'ip', '10.0.0.10'}]
252 fixed_ips = {'sub_id', '10.0.0.10'}
253 allowed_address_pairs = {'10.0.0.101', '1234.5678'}
255 settings = PortSettings(name='foo', network_name='bar', admin_state_up=False, project_name='foo-project',
256 mac_address='1234', ip_addrs=ip_addrs, fixed_ips=fixed_ips,
257 security_groups=['foo_grp_id'], allowed_address_pairs=allowed_address_pairs,
258 opt_value='opt value', opt_name='opt name', device_owner='owner',
259 device_id='device number')
260 self.assertEquals('foo', settings.name)
261 self.assertEquals('bar', settings.network_name)
262 self.assertFalse(settings.admin_state_up)
263 self.assertEquals('foo-project', settings.project_name)
264 self.assertEquals('1234', settings.mac_address)
265 self.assertEquals(ip_addrs, settings.ip_addrs)
266 self.assertEquals(fixed_ips, settings.fixed_ips)
267 self.assertEquals(1, len(settings.security_groups))
268 self.assertEquals('foo_grp_id', settings.security_groups[0])
269 self.assertEquals(allowed_address_pairs, settings.allowed_address_pairs)
270 self.assertEquals('opt value', settings.opt_value)
271 self.assertEquals('opt name', settings.opt_name)
272 self.assertEquals('owner', settings.device_owner)
273 self.assertEquals('device number', settings.device_id)
275 def test_config_all(self):
276 ip_addrs = [{'subnet_name', 'foo-sub', 'ip', '10.0.0.10'}]
277 fixed_ips = {'sub_id', '10.0.0.10'}
278 allowed_address_pairs = {'10.0.0.101', '1234.5678'}
280 settings = PortSettings(config={'name': 'foo', 'network_name': 'bar', 'admin_state_up': False,
281 'project_name': 'foo-project', 'mac_address': '1234', 'ip_addrs': ip_addrs,
282 'fixed_ips': fixed_ips, 'security_groups': ['foo_grp_id'],
283 'allowed_address_pairs': allowed_address_pairs, 'opt_value': 'opt value',
284 'opt_name': 'opt name', 'device_owner': 'owner', 'device_id': 'device number'})
285 self.assertEquals('foo', settings.name)
286 self.assertEquals('bar', settings.network_name)
287 self.assertFalse(settings.admin_state_up)
288 self.assertEquals('foo-project', settings.project_name)
289 self.assertEquals('1234', settings.mac_address)
290 self.assertEquals(ip_addrs, settings.ip_addrs)
291 self.assertEquals(fixed_ips, settings.fixed_ips)
292 self.assertEquals(1, len(settings.security_groups))
293 self.assertEquals('foo_grp_id', settings.security_groups[0])
294 self.assertEquals(allowed_address_pairs, settings.allowed_address_pairs)
295 self.assertEquals('opt value', settings.opt_value)
296 self.assertEquals('opt name', settings.opt_name)
297 self.assertEquals('owner', settings.device_owner)
298 self.assertEquals('device number', settings.device_id)
301 class CreateNetworkSuccessTests(OSIntegrationTestCase):
303 Test for the CreateNework class defined in create_nework.py
308 Sets up object for test
310 super(self.__class__, self).__start__()
312 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
313 self.net_config = openstack_tests.get_pub_net_config(
314 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
315 router_name=guid + '-pub-router', external_net=self.ext_net_name)
317 self.neutron = neutron_utils.neutron_client(self.os_creds)
319 # Initialize for cleanup
320 self.net_creator = None
321 self.router_creator = None
322 self.neutron = neutron_utils.neutron_client(self.os_creds)
328 if self.router_creator:
329 self.router_creator.clean()
332 if len(self.net_creator.get_subnets()) > 0:
333 # Validate subnet has been deleted
334 neutron_utils_tests.validate_subnet(
335 self.neutron, self.net_creator.network_settings.subnet_settings[0].name,
336 self.net_creator.network_settings.subnet_settings[0].cidr, False)
338 if self.net_creator.get_network():
339 # Validate network has been deleted
340 neutron_utils_tests.validate_network(self.neutron, self.net_creator.network_settings.name,
342 self.net_creator.clean()
344 super(self.__class__, self).__clean__()
346 def test_create_network_without_router(self):
348 Tests the creation of an OpenStack network without a router.
351 self.net_creator = OpenStackNetwork(self.os_creds, self.net_config.network_settings)
352 self.net_creator.create()
354 # Validate network was created
355 neutron_utils_tests.validate_network(self.neutron, self.net_creator.network_settings.name, True)
358 neutron_utils_tests.validate_subnet(
359 self.neutron, self.net_creator.network_settings.subnet_settings[0].name,
360 self.net_creator.network_settings.subnet_settings[0].cidr, True)
362 def test_create_delete_network(self):
364 Tests the creation of an OpenStack network, it's deletion, then cleanup.
367 self.net_creator = OpenStackNetwork(self.os_creds, self.net_config.network_settings)
368 self.net_creator.create()
370 # Validate network was created
371 neutron_utils_tests.validate_network(self.neutron, self.net_creator.network_settings.name, True)
373 neutron_utils.delete_network(self.neutron, self.net_creator.get_network())
374 self.assertIsNone(neutron_utils.get_network(self.neutron, self.net_creator.network_settings.name))
376 # This shall not throw an exception here
377 self.net_creator.clean()
379 def test_create_network_with_router(self):
381 Tests the creation of an OpenStack network with a router.
384 self.net_creator = OpenStackNetwork(self.os_creds, self.net_config.network_settings)
385 self.net_creator.create()
388 self.router_creator = create_router.OpenStackRouter(self.os_creds, self.net_config.router_settings)
389 self.router_creator.create()
391 # Validate network was created
392 neutron_utils_tests.validate_network(self.neutron, self.net_creator.network_settings.name, True)
395 neutron_utils_tests.validate_subnet(
396 self.neutron, self.net_creator.network_settings.subnet_settings[0].name,
397 self.net_creator.network_settings.subnet_settings[0].cidr, True)
400 neutron_utils_tests.validate_router(self.neutron, self.router_creator.router_settings.name, True)
402 neutron_utils_tests.validate_interface_router(self.router_creator.get_internal_router_interface(),
403 self.router_creator.get_router(),
404 self.net_creator.get_subnets()[0])
406 def test_create_networks_same_name(self):
408 Tests the creation of an OpenStack network and ensures that the OpenStackNetwork object will not create
412 self.net_creator = OpenStackNetwork(self.os_creds, self.net_config.network_settings)
413 self.net_creator.create()
415 self.net_creator2 = OpenStackNetwork(self.os_creds, self.net_config.network_settings)
416 self.net_creator2.create()
418 self.assertEquals(self.net_creator.get_network()['network']['id'],
419 self.net_creator2.get_network()['network']['id'])
422 class CreateNetworkTypeTests(OSComponentTestCase):
424 Test for the CreateNework class defined in create_nework.py for testing creating networks of different types
429 Sets up object for test
431 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
432 self.net_config = openstack_tests.get_pub_net_config(
433 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet')
435 self.neutron = neutron_utils.neutron_client(self.os_creds)
437 # Initialize for cleanup
438 self.net_creator = None
439 self.neutron = neutron_utils.neutron_client(self.os_creds)
446 if len(self.net_creator.get_subnets()) > 0:
447 # Validate subnet has been deleted
448 neutron_utils_tests.validate_subnet(
449 self.neutron, self.net_creator.network_settings.subnet_settings[0].name,
450 self.net_creator.network_settings.subnet_settings[0].cidr, False)
452 if self.net_creator.get_network():
453 # Validate network has been deleted
454 neutron_utils_tests.validate_network(self.neutron, self.net_creator.network_settings.name,
456 self.net_creator.clean()
457 # TODO - determine why this is not working on Newton
458 # - Unable to create the network. No tenant network is available for allocation.
459 # def test_create_network_type_vlan(self):
461 # Tests the creation of an OpenStack network of type vlan.
464 # network_type = 'vlan'
465 # net_settings = NetworkSettings(name=self.net_config.network_settings.name,
466 # subnet_settings=self.net_config.network_settings.subnet_settings,
467 # network_type=network_type)
469 # # When setting the network_type, creds must be admin
470 # self.net_creator = OpenStackNetwork(self.os_creds, net_settings)
471 # network = self.net_creator.create()
473 # # Validate network was created
474 # neutron_utils_tests.validate_network(self.neutron, net_settings.name, True)
476 # self.assertEquals(network_type, network['network']['provider:network_type'])
478 def test_create_network_type_vxlan(self):
480 Tests the creation of an OpenStack network of type vxlan.
483 network_type = 'vxlan'
484 net_settings = NetworkSettings(name=self.net_config.network_settings.name,
485 subnet_settings=self.net_config.network_settings.subnet_settings,
486 network_type=network_type)
488 # When setting the network_type, creds must be admin
489 self.net_creator = OpenStackNetwork(self.os_creds, net_settings)
490 network = self.net_creator.create()
492 # Validate network was created
493 neutron_utils_tests.validate_network(self.neutron, net_settings.name, True)
495 self.assertEquals(network_type, network['network']['provider:network_type'])
497 # TODO - determine what value we need to place into physical_network
498 # - Do not know what vaule to place into the 'physical_network' setting.
499 # def test_create_network_type_flat(self):
501 # Tests the creation of an OpenStack network of type flat.
504 # network_type = 'flat'
506 # # Unable to find documentation on how to find a value that will work here.
507 # # https://visibilityspots.org/vlan-flat-neutron-provider.html
508 # # https://community.rackspace.com/products/f/45/t/4225
509 # # It appears that this may be due to how OPNFV is configuring OpenStack.
510 # physical_network = '???'
511 # net_settings = NetworkSettings(name=self.net_config.network_settings.name,
512 # subnet_settings=self.net_config.network_settings.subnet_settings,
513 # network_type=network_type, physical_network=physical_network)
514 # self.net_creator = OpenStackNetwork(self.os_creds, net_settings)
515 # network = self.net_creator.create()
517 # # Validate network was created
518 # neutron_utils_tests.validate_network(self.neutron, net_settings.name, True)
520 # self.assertEquals(network_type, network['network']['provider:network_type'])
522 def test_create_network_type_foo(self):
524 Tests the creation of an OpenStack network of type foo which should raise an exception.
528 net_settings = NetworkSettings(name=self.net_config.network_settings.name,
529 subnet_settings=self.net_config.network_settings.subnet_settings,
530 network_type=network_type)
531 self.net_creator = OpenStackNetwork(self.os_creds, net_settings)
532 with self.assertRaises(Exception):
533 self.net_creator.create()