1 # Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from snaps.openstack.utils import keystone_utils
18 from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction
19 from snaps.openstack.tests import openstack_tests
20 from snaps.openstack.utils import neutron_utils
21 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, PortSettings
22 from snaps.openstack import create_router
23 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
24 from snaps.openstack.tests import validation_utils
26 __author__ = 'spisarski'
32 class NeutronSmokeTests(OSComponentTestCase):
34 Tests to ensure that the neutron client can communicate with the cloud
37 def test_neutron_connect_success(self):
39 Tests to ensure that the proper credentials can connect.
41 neutron = neutron_utils.neutron_client(self.os_creds)
43 networks = neutron.list_networks()
46 networks = networks.get('networks')
47 for network in networks:
48 if network.get('name') == self.ext_net_name:
50 self.assertTrue(found)
52 def test_neutron_connect_fail(self):
54 Tests to ensure that the improper credentials cannot connect.
56 from snaps.openstack.os_credentials import OSCreds
58 with self.assertRaises(Exception):
59 neutron = neutron_utils.neutron_client(
60 OSCreds(username='user', password='pass', auth_url='url', project_name='project'))
61 neutron.list_networks()
63 def test_retrieve_ext_network_name(self):
65 Tests the neutron_utils.get_external_network_names to ensure the configured self.ext_net_name is contained
66 within the returned list
69 neutron = neutron_utils.neutron_client(self.os_creds)
70 ext_networks = neutron_utils.get_external_networks(neutron)
72 for network in ext_networks:
73 if network['network']['name'] == self.ext_net_name:
76 self.assertTrue(found)
79 class NeutronUtilsNetworkTests(OSComponentTestCase):
81 Test for creating networks via neutron_utils.py
85 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
86 self.port_name = str(guid) + '-port'
87 self.neutron = neutron_utils.neutron_client(self.os_creds)
89 self.net_config = openstack_tests.get_pub_net_config(net_name=guid + '-pub-net')
93 Cleans the remote OpenStack objects
96 neutron_utils.delete_network(self.neutron, self.network)
97 validate_network(self.neutron, self.network['network']['name'], False)
99 def test_create_network(self):
101 Tests the neutron_utils.create_neutron_net() function
103 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
104 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
105 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
107 def test_create_network_empty_name(self):
109 Tests the neutron_utils.create_neutron_net() function with an empty network name
111 with self.assertRaises(Exception):
112 self.network = neutron_utils.create_network(self.neutron, NetworkSettings(name=''))
114 def test_create_network_null_name(self):
116 Tests the neutron_utils.create_neutron_net() function when the network name is None
118 with self.assertRaises(Exception):
119 self.network = neutron_utils.create_network(self.neutron, NetworkSettings())
122 class NeutronUtilsSubnetTests(OSComponentTestCase):
124 Test for creating networks with subnets via neutron_utils.py
128 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
129 self.port_name = str(guid) + '-port'
130 self.neutron = neutron_utils.neutron_client(self.os_creds)
133 self.net_config = openstack_tests.get_pub_net_config(
134 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', external_net=self.ext_net_name)
138 Cleans the remote OpenStack objects
141 neutron_utils.delete_subnet(self.neutron, self.subnet)
142 validate_subnet(self.neutron, self.subnet.get('name'),
143 self.net_config.network_settings.subnet_settings[0].cidr, False)
146 neutron_utils.delete_network(self.neutron, self.network)
147 validate_network(self.neutron, self.network['network']['name'], False)
149 def test_create_subnet(self):
151 Tests the neutron_utils.create_neutron_net() function
153 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
154 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
155 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
157 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
158 self.os_creds, network=self.network)
159 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
160 self.net_config.network_settings.subnet_settings[0].cidr, True)
162 def test_create_subnet_null_name(self):
164 Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet name is None
166 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
167 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
168 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
170 with self.assertRaises(Exception):
171 SubnetSettings(cidr=self.net_config.subnet_cidr)
173 def test_create_subnet_empty_name(self):
175 Tests the neutron_utils.create_neutron_net() function with an empty name
177 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
178 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
179 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
181 neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
182 self.os_creds, network=self.network)
183 validate_subnet(self.neutron, '', self.net_config.network_settings.subnet_settings[0].cidr, True)
185 def test_create_subnet_null_cidr(self):
187 Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is None
189 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
190 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
191 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
193 with self.assertRaises(Exception):
194 sub_sets = SubnetSettings(cidr=None, name=self.net_config.subnet_name)
195 neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
197 def test_create_subnet_empty_cidr(self):
199 Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is empty
201 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
202 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
203 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
205 with self.assertRaises(Exception):
206 sub_sets = SubnetSettings(cidr='', name=self.net_config.subnet_name)
207 neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
210 class NeutronUtilsRouterTests(OSComponentTestCase):
212 Test for creating routers via neutron_utils.py
216 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
217 self.port_name = str(guid) + '-port'
218 self.neutron = neutron_utils.neutron_client(self.os_creds)
223 self.interface_router = None
224 self.net_config = openstack_tests.get_pub_net_config(
225 net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
226 router_name=guid + '-pub-router', external_net=self.ext_net_name)
230 Cleans the remote OpenStack objects
232 if self.interface_router:
233 neutron_utils.remove_interface_router(self.neutron, self.router, self.subnet)
236 neutron_utils.delete_router(self.neutron, self.router)
237 validate_router(self.neutron, self.router.get('name'), False)
240 neutron_utils.delete_port(self.neutron, self.port)
243 neutron_utils.delete_subnet(self.neutron, self.subnet)
244 validate_subnet(self.neutron, self.subnet.get('name'),
245 self.net_config.network_settings.subnet_settings[0].cidr, False)
248 neutron_utils.delete_network(self.neutron, self.network)
249 validate_network(self.neutron, self.network['network']['name'], False)
251 def test_create_router_simple(self):
253 Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
255 self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
256 validate_router(self.neutron, self.net_config.router_settings.name, True)
258 def test_create_router_with_public_interface(self):
260 Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
262 self.net_config = openstack_tests.OSNetworkConfig(
263 self.net_config.network_settings.name,
264 self.net_config.network_settings.subnet_settings[0].name,
265 self.net_config.network_settings.subnet_settings[0].cidr, self.net_config.router_settings.name,
267 self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
268 validate_router(self.neutron, self.net_config.router_settings.name, True)
269 # TODO - Add validation that the router gatway has been set
271 def test_create_router_empty_name(self):
273 Tests the neutron_utils.create_neutron_net() function
275 with self.assertRaises(Exception):
276 this_router_settings = create_router.RouterSettings(name='')
277 self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
279 def test_create_router_null_name(self):
281 Tests the neutron_utils.create_neutron_subnet() function when the subnet CIDR value is None
283 with self.assertRaises(Exception):
284 this_router_settings = create_router.RouterSettings()
285 self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
286 validate_router(self.neutron, None, True)
288 def test_add_interface_router(self):
290 Tests the neutron_utils.add_interface_router() function
292 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
293 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
294 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
296 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
297 self.os_creds, self.network)
298 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
299 self.net_config.network_settings.subnet_settings[0].cidr, True)
301 self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
302 validate_router(self.neutron, self.net_config.router_settings.name, True)
304 self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
305 validate_interface_router(self.interface_router, self.router, self.subnet)
307 def test_add_interface_router_null_router(self):
309 Tests the neutron_utils.add_interface_router() function for an Exception when the router value is None
311 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
312 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
313 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
315 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
316 self.os_creds, self.network)
317 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
318 self.net_config.network_settings.subnet_settings[0].cidr, True)
320 with self.assertRaises(Exception):
321 self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
323 def test_add_interface_router_null_subnet(self):
325 Tests the neutron_utils.add_interface_router() function for an Exception when the subnet value is None
327 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
328 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
329 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
331 self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
332 validate_router(self.neutron, self.net_config.router_settings.name, True)
334 with self.assertRaises(Exception):
335 self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
337 def test_create_port(self):
339 Tests the neutron_utils.create_port() function
341 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
342 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
343 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
345 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
346 self.os_creds, self.network)
347 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
348 self.net_config.network_settings.subnet_settings[0].cidr, True)
350 self.port = neutron_utils.create_port(
351 self.neutron, self.os_creds, PortSettings(
353 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}],
354 network_name=self.net_config.network_settings.name))
355 validate_port(self.neutron, self.port, self.port_name)
357 def test_create_port_empty_name(self):
359 Tests the neutron_utils.create_port() function
361 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
362 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
363 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
365 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
366 self.os_creds, self.network)
367 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
368 self.net_config.network_settings.subnet_settings[0].cidr, True)
370 self.port = neutron_utils.create_port(
371 self.neutron, self.os_creds, PortSettings(
372 name=self.port_name, network_name=self.net_config.network_settings.name,
373 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}]))
374 validate_port(self.neutron, self.port, self.port_name)
376 def test_create_port_null_name(self):
378 Tests the neutron_utils.create_port() function for an Exception when the port name value is None
380 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
381 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
382 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
384 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
385 self.os_creds, self.network)
386 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
387 self.net_config.network_settings.subnet_settings[0].cidr, True)
389 with self.assertRaises(Exception):
390 self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
391 network_name=self.net_config.network_settings.name,
392 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
394 def test_create_port_null_network_object(self):
396 Tests the neutron_utils.create_port() function for an Exception when the network object is None
398 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
399 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
400 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
402 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
403 self.os_creds, self.network)
404 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
405 self.net_config.network_settings.subnet_settings[0].cidr, True)
407 with self.assertRaises(Exception):
408 self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
409 self.neutron, self.port_name, self.net_config.network_settings.name,
410 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
412 def test_create_port_null_ip(self):
414 Tests the neutron_utils.create_port() function for an Exception when the IP value is None
416 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
417 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
418 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
420 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
421 self.os_creds, self.network)
422 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
423 self.net_config.network_settings.subnet_settings[0].cidr, True)
425 with self.assertRaises(Exception):
426 self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
427 name=self.port_name, network_name=self.net_config.network_settings.name,
428 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': None}]))
430 def test_create_port_invalid_ip(self):
432 Tests the neutron_utils.create_port() function for an Exception when the IP value is None
434 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
435 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
436 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
438 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
439 self.os_creds, self.network)
440 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
441 self.net_config.network_settings.subnet_settings[0].cidr, True)
443 with self.assertRaises(Exception):
444 self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
445 name=self.port_name, network_name=self.net_config.network_settings.name,
446 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': 'foo'}]))
448 def test_create_port_invalid_ip_to_subnet(self):
450 Tests the neutron_utils.create_port() function for an Exception when the IP value is None
452 self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
453 self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
454 self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
456 self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
457 self.os_creds, self.network)
458 validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
459 self.net_config.network_settings.subnet_settings[0].cidr, True)
461 with self.assertRaises(Exception):
462 self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
463 name=self.port_name, network_name=self.net_config.network_settings.name,
464 ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name,
465 'ip': '10.197.123.100'}]))
468 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
470 Test for creating security groups via neutron_utils.py
474 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
475 self.sec_grp_name = guid + 'name'
477 self.security_group = None
478 self.security_group_rules = list()
479 self.neutron = neutron_utils.neutron_client(self.os_creds)
480 self.keystone = keystone_utils.keystone_client(self.os_creds)
484 Cleans the remote OpenStack objects
486 for rule in self.security_group_rules:
487 neutron_utils.delete_security_group_rule(self.neutron, rule)
489 if self.security_group:
490 neutron_utils.delete_security_group(self.neutron, self.security_group)
492 def test_create_delete_simple_sec_grp(self):
494 Tests the neutron_utils.create_security_group() function
496 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
497 self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
499 self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name'])
501 sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
502 self.assertIsNotNone(sec_grp_get)
503 self.assertTrue(validation_utils.objects_equivalent(
504 self.security_group['security_group'], sec_grp_get['security_group']))
506 neutron_utils.delete_security_group(self.neutron, self.security_group)
507 sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
508 self.assertIsNone(sec_grp_get)
509 self.security_group = None
511 def test_create_sec_grp_no_name(self):
513 Tests the SecurityGroupSettings constructor and neutron_utils.create_security_group() function to ensure
514 that attempting to create a security group without a name will raise an exception
516 with self.assertRaises(Exception):
517 sec_grp_settings = SecurityGroupSettings()
518 self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
520 def test_create_sec_grp_no_rules(self):
522 Tests the neutron_utils.create_security_group() function
524 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
525 self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
527 self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name'])
528 self.assertTrue(sec_grp_settings.description, self.security_group['security_group']['description'])
530 sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
531 self.assertIsNotNone(sec_grp_get)
532 self.assertTrue(validation_utils.objects_equivalent(
533 self.security_group['security_group'], sec_grp_get['security_group']))
535 def test_create_sec_grp_one_rule(self):
537 Tests the neutron_utils.create_security_group() function
540 sec_grp_rule_settings = SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
541 sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
542 rule_settings=[sec_grp_rule_settings])
544 self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
545 free_rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_group)
546 for free_rule in free_rules:
547 self.security_group_rules.append(free_rule)
549 self.security_group_rules.append(
550 neutron_utils.create_security_group_rule(self.neutron, sec_grp_settings.rule_settings[0]))
552 # Refresh object so it is populated with the newly added rule
553 self.security_group = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
555 rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_group)
557 self.assertTrue(validation_utils.objects_equivalent(self.security_group_rules, rules))
559 self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name'])
560 self.assertTrue(sec_grp_settings.description, self.security_group['security_group']['description'])
562 sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
563 self.assertIsNotNone(sec_grp_get)
564 self.assertTrue(validation_utils.objects_equivalent(
565 self.security_group['security_group'], sec_grp_get['security_group']))
573 def validate_network(neutron, name, exists):
575 Returns true if a network for a given name DOES NOT exist if the exists parameter is false conversely true.
576 Returns false if a network for a given name DOES exist if the exists parameter is true conversely false.
577 :param neutron: The neutron client
578 :param name: The expected network name
579 :param exists: Whether or not the network name should exist or not
582 network = neutron_utils.get_network(neutron, name)
583 if exists and network:
585 if not exists and not network:
590 def validate_subnet(neutron, name, cidr, exists):
592 Returns true if a subnet for a given name DOES NOT exist if the exists parameter is false conversely true.
593 Returns false if a subnet for a given name DOES exist if the exists parameter is true conversely false.
594 :param neutron: The neutron client
595 :param name: The expected subnet name
596 :param cidr: The expected CIDR value
597 :param exists: Whether or not the network name should exist or not
600 subnet = neutron_utils.get_subnet_by_name(neutron, name)
601 if exists and subnet:
602 return subnet.get('cidr') == cidr
603 if not exists and not subnet:
608 def validate_router(neutron, name, exists):
610 Returns true if a router for a given name DOES NOT exist if the exists parameter is false conversely true.
611 Returns false if a router for a given name DOES exist if the exists parameter is true conversely false.
612 :param neutron: The neutron client
613 :param name: The expected router name
614 :param exists: Whether or not the network name should exist or not
617 router = neutron_utils.get_router_by_name(neutron, name)
618 if exists and router:
623 def validate_interface_router(interface_router, router, subnet):
625 Returns true if the router ID & subnet ID have been properly included into the interface router object
626 :param interface_router: the object to validate
627 :param router: to validate against the interface_router
628 :param subnet: to validate against the interface_router
629 :return: True if both IDs match else False
631 subnet_id = interface_router.get('subnet_id')
632 router_id = interface_router.get('port_id')
634 return subnet.get('id') == subnet_id and router.get('id') == router_id
637 def validate_port(neutron, port_obj, this_port_name):
639 Returns true if a port for a given name DOES NOT exist if the exists parameter is false conversely true.
640 Returns false if a port for a given name DOES exist if the exists parameter is true conversely false.
641 :param neutron: The neutron client
642 :param port_obj: The port object to lookup
643 :param this_port_name: The expected router name
646 ports = neutron.list_ports()
647 for port, port_insts in ports.iteritems():
648 for inst in port_insts:
649 if inst['id'] == port_obj['port']['id']:
650 return inst['name'] == this_port_name