Refactor network retrieval API calls.
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
19     PortSettings
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21     SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(self.os_creds)
92         self.network = None
93         self.net_config = openstack_tests.get_pub_net_config(
94             net_name=guid + '-pub-net')
95
96     def tearDown(self):
97         """
98         Cleans the remote OpenStack objects
99         """
100         if self.network:
101             neutron_utils.delete_network(self.neutron, self.network)
102             validate_network(self.neutron, self.network.name, False)
103
104     def test_create_network(self):
105         """
106         Tests the neutron_utils.create_neutron_net() function
107         """
108         self.network = neutron_utils.create_network(
109             self.neutron, self.os_creds, self.net_config.network_settings)
110         self.assertEqual(self.net_config.network_settings.name,
111                          self.network.name)
112         self.assertTrue(validate_network(
113             self.neutron, self.net_config.network_settings.name, True))
114
115     def test_create_network_empty_name(self):
116         """
117         Tests the neutron_utils.create_neutron_net() function with an empty
118         network name
119         """
120         with self.assertRaises(Exception):
121             self.network = neutron_utils.create_network(
122                 self.neutron, self.os_creds,
123                 network_settings=NetworkSettings(name=''))
124
125     def test_create_network_null_name(self):
126         """
127         Tests the neutron_utils.create_neutron_net() function when the network
128         name is None
129         """
130         with self.assertRaises(Exception):
131             self.network = neutron_utils.create_network(
132                 self.neutron, self.os_creds,
133                 network_settings=NetworkSettings())
134
135
136 class NeutronUtilsSubnetTests(OSComponentTestCase):
137     """
138     Test for creating networks with subnets via neutron_utils.py
139     """
140
141     def setUp(self):
142         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
143         self.port_name = str(guid) + '-port'
144         self.neutron = neutron_utils.neutron_client(self.os_creds)
145         self.network = None
146         self.subnet = None
147         self.net_config = openstack_tests.get_pub_net_config(
148             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
149             external_net=self.ext_net_name)
150
151     def tearDown(self):
152         """
153         Cleans the remote OpenStack objects
154         """
155         if self.subnet:
156             neutron_utils.delete_subnet(self.neutron, self.subnet)
157             validate_subnet(self.neutron, self.subnet.name,
158                             self.net_config.network_settings.subnet_settings[
159                                 0].cidr, False)
160
161         if self.network:
162             neutron_utils.delete_network(self.neutron, self.network)
163             validate_network(self.neutron, self.network.name, False)
164
165     def test_create_subnet(self):
166         """
167         Tests the neutron_utils.create_neutron_net() function
168         """
169         self.network = neutron_utils.create_network(
170             self.neutron, self.os_creds, self.net_config.network_settings)
171         self.assertEqual(self.net_config.network_settings.name,
172                          self.network.name)
173         self.assertTrue(validate_network(
174             self.neutron, self.net_config.network_settings.name, True))
175
176         subnet_setting = self.net_config.network_settings.subnet_settings[0]
177         self.subnet = neutron_utils.create_subnet(
178             self.neutron, subnet_setting, self.os_creds, network=self.network)
179         validate_subnet(
180             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
181
182     def test_create_subnet_null_name(self):
183         """
184         Tests the neutron_utils.create_neutron_subnet() function for an
185         Exception when the subnet name is None
186         """
187         self.network = neutron_utils.create_network(
188             self.neutron, self.os_creds, self.net_config.network_settings)
189         self.assertEqual(self.net_config.network_settings.name,
190                          self.network.name)
191         self.assertTrue(validate_network(
192             self.neutron, self.net_config.network_settings.name, True))
193
194         with self.assertRaises(Exception):
195             SubnetSettings(cidr=self.net_config.subnet_cidr)
196
197     def test_create_subnet_empty_name(self):
198         """
199         Tests the neutron_utils.create_neutron_net() function with an empty
200         name
201         """
202         self.network = neutron_utils.create_network(
203             self.neutron, self.os_creds, self.net_config.network_settings)
204         self.assertEqual(self.net_config.network_settings.name,
205                          self.network.name)
206         self.assertTrue(validate_network(
207             self.neutron, self.net_config.network_settings.name, True))
208
209         subnet_setting = self.net_config.network_settings.subnet_settings[0]
210         neutron_utils.create_subnet(
211             self.neutron, subnet_setting, self.os_creds, network=self.network)
212         validate_subnet(self.neutron, '', subnet_setting.cidr, True)
213
214     def test_create_subnet_null_cidr(self):
215         """
216         Tests the neutron_utils.create_neutron_subnet() function for an
217         Exception when the subnet CIDR value is None
218         """
219         self.network = neutron_utils.create_network(
220             self.neutron, self.os_creds, self.net_config.network_settings)
221         self.assertEqual(self.net_config.network_settings.name,
222                          self.network.name)
223         self.assertTrue(validate_network(
224             self.neutron, self.net_config.network_settings.name, True))
225
226         with self.assertRaises(Exception):
227             sub_sets = SubnetSettings(
228                 cidr=None, name=self.net_config.subnet_name)
229             neutron_utils.create_subnet(
230                 self.neutron, sub_sets, self.os_creds, network=self.network)
231
232     def test_create_subnet_empty_cidr(self):
233         """
234         Tests the neutron_utils.create_neutron_subnet() function for an
235         Exception when the subnet CIDR value is empty
236         """
237         self.network = neutron_utils.create_network(
238             self.neutron, self.os_creds, self.net_config.network_settings)
239         self.assertEqual(self.net_config.network_settings.name,
240                          self.network.name)
241         self.assertTrue(validate_network(
242             self.neutron, self.net_config.network_settings.name, True))
243
244         with self.assertRaises(Exception):
245             sub_sets = SubnetSettings(
246                 cidr='', name=self.net_config.subnet_name)
247             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
248                                         network=self.network)
249
250
251 class NeutronUtilsRouterTests(OSComponentTestCase):
252     """
253     Test for creating routers via neutron_utils.py
254     """
255
256     def setUp(self):
257         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
258         self.port_name = str(guid) + '-port'
259         self.neutron = neutron_utils.neutron_client(self.os_creds)
260         self.network = None
261         self.subnet = None
262         self.port = None
263         self.router = None
264         self.interface_router = None
265         self.net_config = openstack_tests.get_pub_net_config(
266             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
267             router_name=guid + '-pub-router', external_net=self.ext_net_name)
268
269     def tearDown(self):
270         """
271         Cleans the remote OpenStack objects
272         """
273         if self.interface_router:
274             neutron_utils.remove_interface_router(self.neutron, self.router,
275                                                   self.subnet)
276
277         if self.router:
278             neutron_utils.delete_router(self.neutron, self.router)
279             validate_router(self.neutron, self.router.name, False)
280
281         if self.port:
282             neutron_utils.delete_port(self.neutron, self.port)
283
284         if self.subnet:
285             neutron_utils.delete_subnet(self.neutron, self.subnet)
286             validate_subnet(
287                 self.neutron, self.subnet.name,
288                 self.net_config.network_settings.subnet_settings[0].cidr,
289                 False)
290
291         if self.network:
292             neutron_utils.delete_network(self.neutron, self.network)
293             validate_network(self.neutron, self.network.name, False)
294
295     def test_create_router_simple(self):
296         """
297         Tests the neutron_utils.create_neutron_net() function when an external
298         gateway is requested
299         """
300         self.router = neutron_utils.create_router(
301             self.neutron, self.os_creds, self.net_config.router_settings)
302         validate_router(self.neutron, self.net_config.router_settings.name,
303                         True)
304
305     def test_create_router_with_public_interface(self):
306         """
307         Tests the neutron_utils.create_neutron_net() function when an external
308         gateway is requested
309         """
310         subnet_setting = self.net_config.network_settings.subnet_settings[0]
311         self.net_config = openstack_tests.OSNetworkConfig(
312             self.net_config.network_settings.name,
313             subnet_setting.name,
314             subnet_setting.cidr,
315             self.net_config.router_settings.name,
316             self.ext_net_name)
317         self.router = neutron_utils.create_router(
318             self.neutron, self.os_creds, self.net_config.router_settings)
319         validate_router(self.neutron, self.net_config.router_settings.name,
320                         True)
321
322         ext_net = neutron_utils.get_network(
323             self.neutron, network_name=self.ext_net_name)
324         self.assertEqual(
325             self.router.external_gateway_info['network_id'], ext_net.id)
326
327     def test_create_router_empty_name(self):
328         """
329         Tests the neutron_utils.create_neutron_net() function
330         """
331         with self.assertRaises(Exception):
332             this_router_settings = create_router.RouterSettings(name='')
333             self.router = neutron_utils.create_router(self.neutron,
334                                                       self.os_creds,
335                                                       this_router_settings)
336
337     def test_create_router_null_name(self):
338         """
339         Tests the neutron_utils.create_neutron_subnet() function when the
340         subnet CIDR value is None
341         """
342         with self.assertRaises(Exception):
343             this_router_settings = create_router.RouterSettings()
344             self.router = neutron_utils.create_router(self.neutron,
345                                                       self.os_creds,
346                                                       this_router_settings)
347             validate_router(self.neutron, None, True)
348
349     def test_add_interface_router(self):
350         """
351         Tests the neutron_utils.add_interface_router() function
352         """
353         self.network = neutron_utils.create_network(
354             self.neutron, self.os_creds, self.net_config.network_settings)
355         self.assertEqual(self.net_config.network_settings.name,
356                          self.network.name)
357         self.assertTrue(validate_network(
358             self.neutron, self.net_config.network_settings.name, True))
359
360         subnet_setting = self.net_config.network_settings.subnet_settings[0]
361         self.subnet = neutron_utils.create_subnet(
362             self.neutron, subnet_setting,
363             self.os_creds, self.network)
364         validate_subnet(
365             self.neutron,
366             subnet_setting.name,
367             subnet_setting.cidr, True)
368
369         self.router = neutron_utils.create_router(
370             self.neutron, self.os_creds, self.net_config.router_settings)
371         validate_router(self.neutron, self.net_config.router_settings.name,
372                         True)
373
374         self.interface_router = neutron_utils.add_interface_router(
375             self.neutron, self.router, self.subnet)
376         validate_interface_router(self.interface_router, self.router,
377                                   self.subnet)
378
379     def test_add_interface_router_null_router(self):
380         """
381         Tests the neutron_utils.add_interface_router() function for an
382         Exception when the router value is None
383         """
384         self.network = neutron_utils.create_network(
385             self.neutron, self.os_creds, self.net_config.network_settings)
386         self.assertEqual(self.net_config.network_settings.name,
387                          self.network.name)
388         self.assertTrue(validate_network(
389             self.neutron, self.net_config.network_settings.name, True))
390
391         subnet_setting = self.net_config.network_settings.subnet_settings[0]
392         self.subnet = neutron_utils.create_subnet(
393             self.neutron, subnet_setting,
394             self.os_creds, self.network)
395         validate_subnet(
396             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
397
398         with self.assertRaises(NeutronException):
399             self.interface_router = neutron_utils.add_interface_router(
400                 self.neutron, self.router, self.subnet)
401
402     def test_add_interface_router_null_subnet(self):
403         """
404         Tests the neutron_utils.add_interface_router() function for an
405         Exception when the subnet value is None
406         """
407         self.network = neutron_utils.create_network(
408             self.neutron, self.os_creds, self.net_config.network_settings)
409         self.assertEqual(self.net_config.network_settings.name,
410                          self.network.name)
411         self.assertTrue(validate_network(
412             self.neutron, self.net_config.network_settings.name, True))
413
414         self.router = neutron_utils.create_router(
415             self.neutron, self.os_creds, self.net_config.router_settings)
416         validate_router(self.neutron, self.net_config.router_settings.name,
417                         True)
418
419         with self.assertRaises(NeutronException):
420             self.interface_router = neutron_utils.add_interface_router(
421                 self.neutron, self.router, self.subnet)
422
423     def test_create_port(self):
424         """
425         Tests the neutron_utils.create_port() function
426         """
427         self.network = neutron_utils.create_network(
428             self.neutron, self.os_creds, self.net_config.network_settings)
429         self.assertEqual(self.net_config.network_settings.name,
430                          self.network.name)
431         self.assertTrue(validate_network(
432             self.neutron, self.net_config.network_settings.name, True))
433
434         subnet_setting = self.net_config.network_settings.subnet_settings[0]
435         self.subnet = neutron_utils.create_subnet(
436             self.neutron, subnet_setting, self.os_creds, self.network)
437         validate_subnet(self.neutron, subnet_setting.name,
438                         subnet_setting.cidr, True)
439
440         self.port = neutron_utils.create_port(
441             self.neutron, self.os_creds, PortSettings(
442                 name=self.port_name,
443                 ip_addrs=[{
444                     'subnet_name': subnet_setting.name,
445                     'ip': ip_1}],
446                 network_name=self.net_config.network_settings.name))
447         validate_port(self.neutron, self.port, self.port_name)
448
449     def test_create_port_empty_name(self):
450         """
451         Tests the neutron_utils.create_port() function
452         """
453         self.network = neutron_utils.create_network(
454             self.neutron, self.os_creds, self.net_config.network_settings)
455         self.assertEqual(self.net_config.network_settings.name,
456                          self.network.name)
457         self.assertTrue(validate_network(
458             self.neutron, self.net_config.network_settings.name, True))
459
460         subnet_setting = self.net_config.network_settings.subnet_settings[0]
461         self.subnet = neutron_utils.create_subnet(
462             self.neutron, subnet_setting, self.os_creds, self.network)
463         validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
464                         True)
465
466         self.port = neutron_utils.create_port(
467             self.neutron, self.os_creds, PortSettings(
468                 name=self.port_name,
469                 network_name=self.net_config.network_settings.name,
470                 ip_addrs=[{
471                     'subnet_name': subnet_setting.name,
472                     'ip': ip_1}]))
473         validate_port(self.neutron, self.port, self.port_name)
474
475     def test_create_port_null_name(self):
476         """
477         Tests the neutron_utils.create_port() function for an Exception when
478         the port name value is None
479         """
480         self.network = neutron_utils.create_network(
481             self.neutron, self.os_creds, self.net_config.network_settings)
482         self.assertEqual(self.net_config.network_settings.name,
483                          self.network.name)
484         self.assertTrue(validate_network(
485             self.neutron, self.net_config.network_settings.name, True))
486
487         subnet_setting = self.net_config.network_settings.subnet_settings[0]
488         self.subnet = neutron_utils.create_subnet(
489             self.neutron, subnet_setting,
490             self.os_creds, self.network)
491         validate_subnet(
492             self.neutron,
493             subnet_setting.name,
494             subnet_setting.cidr, True)
495
496         with self.assertRaises(Exception):
497             self.port = neutron_utils.create_port(
498                 self.neutron, self.os_creds,
499                 PortSettings(
500                     network_name=self.net_config.network_settings.name,
501                     ip_addrs=[{
502                         'subnet_name': subnet_setting.name,
503                         'ip': ip_1}]))
504
505     def test_create_port_null_network_object(self):
506         """
507         Tests the neutron_utils.create_port() function for an Exception when
508         the network object is None
509         """
510         with self.assertRaises(Exception):
511             self.port = neutron_utils.create_port(
512                 self.neutron, self.os_creds,
513                 PortSettings(
514                     name=self.port_name,
515                     network_name=self.net_config.network_settings.name,
516                     ip_addrs=[{
517                         'subnet_name':
518                             self.net_config.network_settings.subnet_settings[
519                                 0].name,
520                         'ip': ip_1}]))
521
522     def test_create_port_null_ip(self):
523         """
524         Tests the neutron_utils.create_port() function for an Exception when
525         the IP value is None
526         """
527         self.network = neutron_utils.create_network(
528             self.neutron, self.os_creds, self.net_config.network_settings)
529         self.assertEqual(self.net_config.network_settings.name,
530                          self.network.name)
531         self.assertTrue(validate_network(
532             self.neutron, self.net_config.network_settings.name, True))
533
534         subnet_setting = self.net_config.network_settings.subnet_settings[0]
535         self.subnet = neutron_utils.create_subnet(
536             self.neutron, subnet_setting,
537             self.os_creds, self.network)
538         validate_subnet(
539             self.neutron,
540             subnet_setting.name,
541             subnet_setting.cidr, True)
542
543         with self.assertRaises(Exception):
544             self.port = neutron_utils.create_port(
545                 self.neutron, self.os_creds,
546                 PortSettings(
547                     name=self.port_name,
548                     network_name=self.net_config.network_settings.name,
549                     ip_addrs=[{
550                         'subnet_name': subnet_setting.name,
551                         'ip': None}]))
552
553     def test_create_port_invalid_ip(self):
554         """
555         Tests the neutron_utils.create_port() function for an Exception when
556         the IP value is None
557         """
558         self.network = neutron_utils.create_network(
559             self.neutron, self.os_creds, self.net_config.network_settings)
560         self.assertEqual(self.net_config.network_settings.name,
561                          self.network.name)
562         self.assertTrue(validate_network(
563             self.neutron, self.net_config.network_settings.name, True))
564
565         subnet_setting = self.net_config.network_settings.subnet_settings[0]
566         self.subnet = neutron_utils.create_subnet(
567             self.neutron, subnet_setting, self.os_creds, self.network)
568         validate_subnet(self.neutron,
569                         subnet_setting.name,
570                         subnet_setting.cidr, True)
571
572         with self.assertRaises(Exception):
573             self.port = neutron_utils.create_port(
574                 self.neutron, self.os_creds,
575                 PortSettings(
576                     name=self.port_name,
577                     network_name=self.net_config.network_settings.name,
578                     ip_addrs=[{
579                         'subnet_name': subnet_setting.name,
580                         'ip': 'foo'}]))
581
582     def test_create_port_invalid_ip_to_subnet(self):
583         """
584         Tests the neutron_utils.create_port() function for an Exception when
585         the IP value is None
586         """
587         self.network = neutron_utils.create_network(
588             self.neutron, self.os_creds, self.net_config.network_settings)
589         self.assertEqual(self.net_config.network_settings.name,
590                          self.network.name)
591         self.assertTrue(validate_network(
592             self.neutron, self.net_config.network_settings.name, True))
593
594         subnet_setting = self.net_config.network_settings.subnet_settings[0]
595         self.subnet = neutron_utils.create_subnet(
596             self.neutron, subnet_setting, self.os_creds, self.network)
597         validate_subnet(
598             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
599
600         with self.assertRaises(Exception):
601             self.port = neutron_utils.create_port(
602                 self.neutron, self.os_creds,
603                 PortSettings(
604                     name=self.port_name,
605                     network_name=self.net_config.network_settings.name,
606                     ip_addrs=[{
607                         'subnet_name': subnet_setting.name,
608                         'ip': '10.197.123.100'}]))
609
610
611 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
612     """
613     Test for creating security groups via neutron_utils.py
614     """
615
616     def setUp(self):
617         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
618         self.sec_grp_name = guid + 'name'
619
620         self.security_groups = list()
621         self.security_group_rules = list()
622         self.neutron = neutron_utils.neutron_client(self.os_creds)
623         self.keystone = keystone_utils.keystone_client(self.os_creds)
624
625     def tearDown(self):
626         """
627         Cleans the remote OpenStack objects
628         """
629         for rule in self.security_group_rules:
630             neutron_utils.delete_security_group_rule(self.neutron, rule)
631
632         for security_group in self.security_groups:
633             try:
634                 neutron_utils.delete_security_group(self.neutron,
635                                                     security_group)
636             except:
637                 pass
638
639     def test_create_delete_simple_sec_grp(self):
640         """
641         Tests the neutron_utils.create_security_group() function
642         """
643         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
644         security_group = neutron_utils.create_security_group(self.neutron,
645                                                              self.keystone,
646                                                              sec_grp_settings)
647
648         self.assertTrue(sec_grp_settings.name, security_group.name)
649
650         sec_grp_get = neutron_utils.get_security_group(self.neutron,
651                                                        sec_grp_settings.name)
652         self.assertIsNotNone(sec_grp_get)
653         self.assertTrue(validation_utils.objects_equivalent(
654             security_group, sec_grp_get))
655
656         neutron_utils.delete_security_group(self.neutron, security_group)
657         sec_grp_get = neutron_utils.get_security_group(self.neutron,
658                                                        sec_grp_settings.name)
659         self.assertIsNone(sec_grp_get)
660
661     def test_create_sec_grp_no_name(self):
662         """
663         Tests the SecurityGroupSettings constructor and
664         neutron_utils.create_security_group() function to ensure that
665         attempting to create a security group without a name will raise an
666         exception
667         """
668         with self.assertRaises(Exception):
669             sec_grp_settings = SecurityGroupSettings()
670             self.security_groups.append(
671                 neutron_utils.create_security_group(self.neutron,
672                                                     self.keystone,
673                                                     sec_grp_settings))
674
675     def test_create_sec_grp_no_rules(self):
676         """
677         Tests the neutron_utils.create_security_group() function
678         """
679         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
680                                                  description='hello group')
681         self.security_groups.append(
682             neutron_utils.create_security_group(self.neutron, self.keystone,
683                                                 sec_grp_settings))
684
685         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
686         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
687
688         sec_grp_get = neutron_utils.get_security_group(self.neutron,
689                                                        sec_grp_settings.name)
690         self.assertIsNotNone(sec_grp_get)
691         self.assertEqual(self.security_groups[0], sec_grp_get)
692
693     def test_create_sec_grp_one_rule(self):
694         """
695         Tests the neutron_utils.create_security_group() function
696         """
697
698         sec_grp_rule_settings = SecurityGroupRuleSettings(
699             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
700         sec_grp_settings = SecurityGroupSettings(
701             name=self.sec_grp_name, description='hello group',
702             rule_settings=[sec_grp_rule_settings])
703
704         self.security_groups.append(
705             neutron_utils.create_security_group(self.neutron, self.keystone,
706                                                 sec_grp_settings))
707         free_rules = neutron_utils.get_rules_by_security_group(
708             self.neutron, self.security_groups[0])
709         for free_rule in free_rules:
710             self.security_group_rules.append(free_rule)
711
712         self.security_group_rules.append(
713             neutron_utils.create_security_group_rule(
714                 self.neutron, sec_grp_settings.rule_settings[0]))
715
716         # Refresh object so it is populated with the newly added rule
717         security_group = neutron_utils.get_security_group(
718             self.neutron, sec_grp_settings.name)
719
720         rules = neutron_utils.get_rules_by_security_group(self.neutron,
721                                                           security_group)
722
723         self.assertTrue(
724             validation_utils.objects_equivalent(
725                  self.security_group_rules, rules))
726
727         self.assertTrue(sec_grp_settings.name, security_group.name)
728
729         sec_grp_get = neutron_utils.get_security_group(self.neutron,
730                                                        sec_grp_settings.name)
731         self.assertIsNotNone(sec_grp_get)
732         self.assertEqual(security_group, sec_grp_get)
733
734     def test_get_sec_grp_by_id(self):
735         """
736         Tests the neutron_utils.create_security_group() function
737         """
738
739         self.security_groups.append(neutron_utils.create_security_group(
740             self.neutron, self.keystone,
741             SecurityGroupSettings(name=self.sec_grp_name + '-1',
742                                   description='hello group')))
743         self.security_groups.append(neutron_utils.create_security_group(
744             self.neutron, self.keystone,
745             SecurityGroupSettings(name=self.sec_grp_name + '-2',
746                                   description='hello group')))
747
748         sec_grp_1b = neutron_utils.get_security_group_by_id(
749             self.neutron, self.security_groups[0].id)
750         sec_grp_2b = neutron_utils.get_security_group_by_id(
751             self.neutron, self.security_groups[1].id)
752
753         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
754         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
755
756
757 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
758     """
759     Test basic nova keypair functionality
760     """
761
762     def setUp(self):
763         """
764         Instantiates the CreateImage object that is responsible for downloading
765         and creating an OS image file within OpenStack
766         """
767         self.neutron = neutron_utils.neutron_client(self.os_creds)
768         self.floating_ip = None
769
770     def tearDown(self):
771         """
772         Cleans the image and downloaded image file
773         """
774         if self.floating_ip:
775             neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
776
777     def test_floating_ips(self):
778         """
779         Tests the creation of a floating IP
780         :return:
781         """
782         initial_fips = neutron_utils.get_floating_ips(self.neutron)
783
784         self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
785                                                             self.ext_net_name)
786         all_fips = neutron_utils.get_floating_ips(self.neutron)
787         self.assertEqual(len(initial_fips) + 1, len(all_fips))
788         returned = neutron_utils.get_floating_ip(self.neutron,
789                                                  self.floating_ip)
790         self.assertEqual(self.floating_ip.id, returned.id)
791         self.assertEqual(self.floating_ip.ip, returned.ip)
792
793
794 """
795 Validation routines
796 """
797
798
799 def validate_network(neutron, name, exists):
800     """
801     Returns true if a network for a given name DOES NOT exist if the exists
802     parameter is false conversely true. Returns false if a network for a given
803     name DOES exist if the exists parameter is true conversely false.
804     :param neutron: The neutron client
805     :param name: The expected network name
806     :param exists: Whether or not the network name should exist or not
807     :return: True/False
808     """
809     network = neutron_utils.get_network(neutron, network_name=name)
810     if exists and network:
811         return True
812     if not exists and not network:
813         return True
814     return False
815
816
817 def validate_subnet(neutron, name, cidr, exists):
818     """
819     Returns true if a subnet for a given name DOES NOT exist if the exists
820     parameter is false conversely true. Returns false if a subnet for a given
821     name DOES exist if the exists parameter is true conversely false.
822     :param neutron: The neutron client
823     :param name: The expected subnet name
824     :param cidr: The expected CIDR value
825     :param exists: Whether or not the network name should exist or not
826     :return: True/False
827     """
828     subnet = neutron_utils.get_subnet_by_name(neutron, name)
829     if exists and subnet:
830         return subnet.cidr == cidr
831     if not exists and not subnet:
832         return True
833     return False
834
835
836 def validate_router(neutron, name, exists):
837     """
838     Returns true if a router for a given name DOES NOT exist if the exists
839     parameter is false conversely true. Returns false if a router for a given
840     name DOES exist if the exists parameter is true conversely false.
841     :param neutron: The neutron client
842     :param name: The expected router name
843     :param exists: Whether or not the network name should exist or not
844     :return: True/False
845     """
846     router = neutron_utils.get_router_by_name(neutron, name)
847     if exists and router:
848         return True
849     return False
850
851
852 def validate_interface_router(interface_router, router, subnet):
853     """
854     Returns true if the router ID & subnet ID have been properly included into
855     the interface router object
856     :param interface_router: the SNAPS-OO InterfaceRouter domain object
857     :param router: to validate against the interface_router
858     :param subnet: to validate against the interface_router
859     :return: True if both IDs match else False
860     """
861     subnet_id = interface_router.subnet_id
862     router_id = interface_router.port_id
863
864     return subnet.id == subnet_id and router.id == router_id
865
866
867 def validate_port(neutron, port_obj, this_port_name):
868     """
869     Returns true if a port for a given name DOES NOT exist if the exists
870     parameter is false conversely true. Returns false if a port for a given
871     name DOES exist if the exists parameter is true conversely false.
872     :param neutron: The neutron client
873     :param port_obj: The port object to lookup
874     :param this_port_name: The expected router name
875     :return: True/False
876     """
877     os_ports = neutron.list_ports()
878     for os_port, os_port_insts in os_ports.items():
879         for os_inst in os_port_insts:
880             if os_inst['id'] == port_obj.id:
881                 return os_inst['name'] == this_port_name
882     return False