Merge "Created new class AnsibleException."
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
19     PortSettings
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21     SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27 from snaps.openstack.utils.neutron_utils import NeutronException
28
29 __author__ = 'spisarski'
30
31 ip_1 = '10.55.1.100'
32 ip_2 = '10.55.1.200'
33
34
35 class NeutronSmokeTests(OSComponentTestCase):
36     """
37     Tests to ensure that the neutron client can communicate with the cloud
38     """
39
40     def test_neutron_connect_success(self):
41         """
42         Tests to ensure that the proper credentials can connect.
43         """
44         neutron = neutron_utils.neutron_client(self.os_creds)
45
46         networks = neutron.list_networks()
47
48         found = False
49         networks = networks.get('networks')
50         for network in networks:
51             if network.get('name') == self.ext_net_name:
52                 found = True
53         self.assertTrue(found)
54
55     def test_neutron_connect_fail(self):
56         """
57         Tests to ensure that the improper credentials cannot connect.
58         """
59         from snaps.openstack.os_credentials import OSCreds
60
61         with self.assertRaises(Exception):
62             neutron = neutron_utils.neutron_client(
63                 OSCreds(username='user', password='pass', auth_url='url',
64                         project_name='project'))
65             neutron.list_networks()
66
67     def test_retrieve_ext_network_name(self):
68         """
69         Tests the neutron_utils.get_external_network_names to ensure the
70         configured self.ext_net_name is contained within the returned list
71         :return:
72         """
73         neutron = neutron_utils.neutron_client(self.os_creds)
74         ext_networks = neutron_utils.get_external_networks(neutron)
75         found = False
76         for network in ext_networks:
77             if network.name == self.ext_net_name:
78                 found = True
79                 break
80         self.assertTrue(found)
81
82
83 class NeutronUtilsNetworkTests(OSComponentTestCase):
84     """
85     Test for creating networks via neutron_utils.py
86     """
87
88     def setUp(self):
89         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
90         self.port_name = str(guid) + '-port'
91         self.neutron = neutron_utils.neutron_client(self.os_creds)
92         self.network = None
93         self.net_config = openstack_tests.get_pub_net_config(
94             net_name=guid + '-pub-net')
95
96     def tearDown(self):
97         """
98         Cleans the remote OpenStack objects
99         """
100         if self.network:
101             neutron_utils.delete_network(self.neutron, self.network)
102             validate_network(self.neutron, self.network.name, False)
103
104     def test_create_network(self):
105         """
106         Tests the neutron_utils.create_neutron_net() function
107         """
108         self.network = neutron_utils.create_network(
109             self.neutron, self.os_creds, self.net_config.network_settings)
110         self.assertEqual(self.net_config.network_settings.name,
111                          self.network.name)
112         self.assertTrue(validate_network(
113             self.neutron, self.net_config.network_settings.name, True))
114
115     def test_create_network_empty_name(self):
116         """
117         Tests the neutron_utils.create_neutron_net() function with an empty
118         network name
119         """
120         with self.assertRaises(Exception):
121             self.network = neutron_utils.create_network(
122                 self.neutron, self.os_creds,
123                 network_settings=NetworkSettings(name=''))
124
125     def test_create_network_null_name(self):
126         """
127         Tests the neutron_utils.create_neutron_net() function when the network
128         name is None
129         """
130         with self.assertRaises(Exception):
131             self.network = neutron_utils.create_network(
132                 self.neutron, self.os_creds,
133                 network_settings=NetworkSettings())
134
135
136 class NeutronUtilsSubnetTests(OSComponentTestCase):
137     """
138     Test for creating networks with subnets via neutron_utils.py
139     """
140
141     def setUp(self):
142         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
143         self.port_name = str(guid) + '-port'
144         self.neutron = neutron_utils.neutron_client(self.os_creds)
145         self.network = None
146         self.subnet = None
147         self.net_config = openstack_tests.get_pub_net_config(
148             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
149             external_net=self.ext_net_name)
150
151     def tearDown(self):
152         """
153         Cleans the remote OpenStack objects
154         """
155         if self.subnet:
156             neutron_utils.delete_subnet(self.neutron, self.subnet)
157             validate_subnet(self.neutron, self.subnet.name,
158                             self.net_config.network_settings.subnet_settings[
159                                 0].cidr, False)
160
161         if self.network:
162             neutron_utils.delete_network(self.neutron, self.network)
163             validate_network(self.neutron, self.network.name, False)
164
165     def test_create_subnet(self):
166         """
167         Tests the neutron_utils.create_neutron_net() function
168         """
169         self.network = neutron_utils.create_network(
170             self.neutron, self.os_creds, self.net_config.network_settings)
171         self.assertEqual(self.net_config.network_settings.name,
172                          self.network.name)
173         self.assertTrue(validate_network(
174             self.neutron, self.net_config.network_settings.name, True))
175
176         subnet_setting = self.net_config.network_settings.subnet_settings[0]
177         self.subnet = neutron_utils.create_subnet(
178             self.neutron, subnet_setting, self.os_creds, network=self.network)
179         validate_subnet(
180             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
181
182     def test_create_subnet_null_name(self):
183         """
184         Tests the neutron_utils.create_neutron_subnet() function for an
185         Exception when the subnet name is None
186         """
187         self.network = neutron_utils.create_network(
188             self.neutron, self.os_creds, self.net_config.network_settings)
189         self.assertEqual(self.net_config.network_settings.name,
190                          self.network.name)
191         self.assertTrue(validate_network(
192             self.neutron, self.net_config.network_settings.name, True))
193
194         with self.assertRaises(Exception):
195             SubnetSettings(cidr=self.net_config.subnet_cidr)
196
197     def test_create_subnet_empty_name(self):
198         """
199         Tests the neutron_utils.create_neutron_net() function with an empty
200         name
201         """
202         self.network = neutron_utils.create_network(
203             self.neutron, self.os_creds, self.net_config.network_settings)
204         self.assertEqual(self.net_config.network_settings.name,
205                          self.network.name)
206         self.assertTrue(validate_network(
207             self.neutron, self.net_config.network_settings.name, True))
208
209         subnet_setting = self.net_config.network_settings.subnet_settings[0]
210         neutron_utils.create_subnet(
211             self.neutron, subnet_setting, self.os_creds, network=self.network)
212         validate_subnet(self.neutron, '', subnet_setting.cidr, True)
213
214     def test_create_subnet_null_cidr(self):
215         """
216         Tests the neutron_utils.create_neutron_subnet() function for an
217         Exception when the subnet CIDR value is None
218         """
219         self.network = neutron_utils.create_network(
220             self.neutron, self.os_creds, self.net_config.network_settings)
221         self.assertEqual(self.net_config.network_settings.name,
222                          self.network.name)
223         self.assertTrue(validate_network(
224             self.neutron, self.net_config.network_settings.name, True))
225
226         with self.assertRaises(Exception):
227             sub_sets = SubnetSettings(
228                 cidr=None, name=self.net_config.subnet_name)
229             neutron_utils.create_subnet(
230                 self.neutron, sub_sets, self.os_creds, network=self.network)
231
232     def test_create_subnet_empty_cidr(self):
233         """
234         Tests the neutron_utils.create_neutron_subnet() function for an
235         Exception when the subnet CIDR value is empty
236         """
237         self.network = neutron_utils.create_network(
238             self.neutron, self.os_creds, self.net_config.network_settings)
239         self.assertEqual(self.net_config.network_settings.name,
240                          self.network.name)
241         self.assertTrue(validate_network(
242             self.neutron, self.net_config.network_settings.name, True))
243
244         with self.assertRaises(Exception):
245             sub_sets = SubnetSettings(
246                 cidr='', name=self.net_config.subnet_name)
247             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
248                                         network=self.network)
249
250
251 class NeutronUtilsRouterTests(OSComponentTestCase):
252     """
253     Test for creating routers via neutron_utils.py
254     """
255
256     def setUp(self):
257         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
258         self.port_name = str(guid) + '-port'
259         self.neutron = neutron_utils.neutron_client(self.os_creds)
260         self.network = None
261         self.subnet = None
262         self.port = None
263         self.router = None
264         self.interface_router = None
265         self.net_config = openstack_tests.get_pub_net_config(
266             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
267             router_name=guid + '-pub-router', external_net=self.ext_net_name)
268
269     def tearDown(self):
270         """
271         Cleans the remote OpenStack objects
272         """
273         if self.interface_router:
274             neutron_utils.remove_interface_router(self.neutron, self.router,
275                                                   self.subnet)
276
277         if self.router:
278             neutron_utils.delete_router(self.neutron, self.router)
279             validate_router(self.neutron, self.router.name, False)
280
281         if self.port:
282             neutron_utils.delete_port(self.neutron, self.port)
283
284         if self.subnet:
285             neutron_utils.delete_subnet(self.neutron, self.subnet)
286             validate_subnet(
287                 self.neutron, self.subnet.name,
288                 self.net_config.network_settings.subnet_settings[0].cidr,
289                 False)
290
291         if self.network:
292             neutron_utils.delete_network(self.neutron, self.network)
293             validate_network(self.neutron, self.network.name, False)
294
295     def test_create_router_simple(self):
296         """
297         Tests the neutron_utils.create_neutron_net() function when an external
298         gateway is requested
299         """
300         self.router = neutron_utils.create_router(
301             self.neutron, self.os_creds, self.net_config.router_settings)
302         validate_router(self.neutron, self.net_config.router_settings.name,
303                         True)
304
305     def test_create_router_with_public_interface(self):
306         """
307         Tests the neutron_utils.create_neutron_net() function when an external
308         gateway is requested
309         """
310         subnet_setting = self.net_config.network_settings.subnet_settings[0]
311         self.net_config = openstack_tests.OSNetworkConfig(
312             self.net_config.network_settings.name,
313             subnet_setting.name,
314             subnet_setting.cidr,
315             self.net_config.router_settings.name,
316             self.ext_net_name)
317         self.router = neutron_utils.create_router(
318             self.neutron, self.os_creds, self.net_config.router_settings)
319         validate_router(self.neutron, self.net_config.router_settings.name,
320                         True)
321         # TODO - Add validation that the router gatway has been set
322
323     def test_create_router_empty_name(self):
324         """
325         Tests the neutron_utils.create_neutron_net() function
326         """
327         with self.assertRaises(Exception):
328             this_router_settings = create_router.RouterSettings(name='')
329             self.router = neutron_utils.create_router(self.neutron,
330                                                       self.os_creds,
331                                                       this_router_settings)
332
333     def test_create_router_null_name(self):
334         """
335         Tests the neutron_utils.create_neutron_subnet() function when the
336         subnet CIDR value is None
337         """
338         with self.assertRaises(Exception):
339             this_router_settings = create_router.RouterSettings()
340             self.router = neutron_utils.create_router(self.neutron,
341                                                       self.os_creds,
342                                                       this_router_settings)
343             validate_router(self.neutron, None, True)
344
345     def test_add_interface_router(self):
346         """
347         Tests the neutron_utils.add_interface_router() function
348         """
349         self.network = neutron_utils.create_network(
350             self.neutron, self.os_creds, self.net_config.network_settings)
351         self.assertEqual(self.net_config.network_settings.name,
352                          self.network.name)
353         self.assertTrue(validate_network(
354             self.neutron, self.net_config.network_settings.name, True))
355
356         subnet_setting = self.net_config.network_settings.subnet_settings[0]
357         self.subnet = neutron_utils.create_subnet(
358             self.neutron, subnet_setting,
359             self.os_creds, self.network)
360         validate_subnet(
361             self.neutron,
362             subnet_setting.name,
363             subnet_setting.cidr, True)
364
365         self.router = neutron_utils.create_router(
366             self.neutron, self.os_creds, self.net_config.router_settings)
367         validate_router(self.neutron, self.net_config.router_settings.name,
368                         True)
369
370         self.interface_router = neutron_utils.add_interface_router(
371             self.neutron, self.router, self.subnet)
372         validate_interface_router(self.interface_router, self.router,
373                                   self.subnet)
374
375     def test_add_interface_router_null_router(self):
376         """
377         Tests the neutron_utils.add_interface_router() function for an
378         Exception when the router value is None
379         """
380         self.network = neutron_utils.create_network(
381             self.neutron, self.os_creds, self.net_config.network_settings)
382         self.assertEqual(self.net_config.network_settings.name,
383                          self.network.name)
384         self.assertTrue(validate_network(
385             self.neutron, self.net_config.network_settings.name, True))
386
387         subnet_setting = self.net_config.network_settings.subnet_settings[0]
388         self.subnet = neutron_utils.create_subnet(
389             self.neutron, subnet_setting,
390             self.os_creds, self.network)
391         validate_subnet(
392             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
393
394         with self.assertRaises(NeutronException):
395             self.interface_router = neutron_utils.add_interface_router(
396                 self.neutron, self.router, self.subnet)
397
398     def test_add_interface_router_null_subnet(self):
399         """
400         Tests the neutron_utils.add_interface_router() function for an
401         Exception when the subnet value is None
402         """
403         self.network = neutron_utils.create_network(
404             self.neutron, self.os_creds, self.net_config.network_settings)
405         self.assertEqual(self.net_config.network_settings.name,
406                          self.network.name)
407         self.assertTrue(validate_network(
408             self.neutron, self.net_config.network_settings.name, True))
409
410         self.router = neutron_utils.create_router(
411             self.neutron, self.os_creds, self.net_config.router_settings)
412         validate_router(self.neutron, self.net_config.router_settings.name,
413                         True)
414
415         with self.assertRaises(NeutronException):
416             self.interface_router = neutron_utils.add_interface_router(
417                 self.neutron, self.router, self.subnet)
418
419     def test_create_port(self):
420         """
421         Tests the neutron_utils.create_port() function
422         """
423         self.network = neutron_utils.create_network(
424             self.neutron, self.os_creds, self.net_config.network_settings)
425         self.assertEqual(self.net_config.network_settings.name,
426                          self.network.name)
427         self.assertTrue(validate_network(
428             self.neutron, self.net_config.network_settings.name, True))
429
430         subnet_setting = self.net_config.network_settings.subnet_settings[0]
431         self.subnet = neutron_utils.create_subnet(
432             self.neutron, subnet_setting, self.os_creds, self.network)
433         validate_subnet(self.neutron, subnet_setting.name,
434                         subnet_setting.cidr, True)
435
436         self.port = neutron_utils.create_port(
437             self.neutron, self.os_creds, PortSettings(
438                 name=self.port_name,
439                 ip_addrs=[{
440                     'subnet_name': subnet_setting.name,
441                     'ip': ip_1}],
442                 network_name=self.net_config.network_settings.name))
443         validate_port(self.neutron, self.port, self.port_name)
444
445     def test_create_port_empty_name(self):
446         """
447         Tests the neutron_utils.create_port() function
448         """
449         self.network = neutron_utils.create_network(
450             self.neutron, self.os_creds, self.net_config.network_settings)
451         self.assertEqual(self.net_config.network_settings.name,
452                          self.network.name)
453         self.assertTrue(validate_network(
454             self.neutron, self.net_config.network_settings.name, True))
455
456         subnet_setting = self.net_config.network_settings.subnet_settings[0]
457         self.subnet = neutron_utils.create_subnet(
458             self.neutron, subnet_setting, self.os_creds, self.network)
459         validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
460                         True)
461
462         self.port = neutron_utils.create_port(
463             self.neutron, self.os_creds, PortSettings(
464                 name=self.port_name,
465                 network_name=self.net_config.network_settings.name,
466                 ip_addrs=[{
467                     'subnet_name': subnet_setting.name,
468                     'ip': ip_1}]))
469         validate_port(self.neutron, self.port, self.port_name)
470
471     def test_create_port_null_name(self):
472         """
473         Tests the neutron_utils.create_port() function for an Exception when
474         the port name value is None
475         """
476         self.network = neutron_utils.create_network(
477             self.neutron, self.os_creds, self.net_config.network_settings)
478         self.assertEqual(self.net_config.network_settings.name,
479                          self.network.name)
480         self.assertTrue(validate_network(
481             self.neutron, self.net_config.network_settings.name, True))
482
483         subnet_setting = self.net_config.network_settings.subnet_settings[0]
484         self.subnet = neutron_utils.create_subnet(
485             self.neutron, subnet_setting,
486             self.os_creds, self.network)
487         validate_subnet(
488             self.neutron,
489             subnet_setting.name,
490             subnet_setting.cidr, True)
491
492         with self.assertRaises(Exception):
493             self.port = neutron_utils.create_port(
494                 self.neutron, self.os_creds,
495                 PortSettings(
496                     network_name=self.net_config.network_settings.name,
497                     ip_addrs=[{
498                         'subnet_name': subnet_setting.name,
499                         'ip': ip_1}]))
500
501     def test_create_port_null_network_object(self):
502         """
503         Tests the neutron_utils.create_port() function for an Exception when
504         the network object is None
505         """
506         with self.assertRaises(Exception):
507             self.port = neutron_utils.create_port(
508                 self.neutron, self.os_creds,
509                 PortSettings(
510                     name=self.port_name,
511                     network_name=self.net_config.network_settings.name,
512                     ip_addrs=[{
513                         'subnet_name':
514                             self.net_config.network_settings.subnet_settings[
515                                 0].name,
516                         'ip': ip_1}]))
517
518     def test_create_port_null_ip(self):
519         """
520         Tests the neutron_utils.create_port() function for an Exception when
521         the IP value is None
522         """
523         self.network = neutron_utils.create_network(
524             self.neutron, self.os_creds, self.net_config.network_settings)
525         self.assertEqual(self.net_config.network_settings.name,
526                          self.network.name)
527         self.assertTrue(validate_network(
528             self.neutron, self.net_config.network_settings.name, True))
529
530         subnet_setting = self.net_config.network_settings.subnet_settings[0]
531         self.subnet = neutron_utils.create_subnet(
532             self.neutron, subnet_setting,
533             self.os_creds, self.network)
534         validate_subnet(
535             self.neutron,
536             subnet_setting.name,
537             subnet_setting.cidr, True)
538
539         with self.assertRaises(Exception):
540             self.port = neutron_utils.create_port(
541                 self.neutron, self.os_creds,
542                 PortSettings(
543                     name=self.port_name,
544                     network_name=self.net_config.network_settings.name,
545                     ip_addrs=[{
546                         'subnet_name': subnet_setting.name,
547                         'ip': None}]))
548
549     def test_create_port_invalid_ip(self):
550         """
551         Tests the neutron_utils.create_port() function for an Exception when
552         the IP value is None
553         """
554         self.network = neutron_utils.create_network(
555             self.neutron, self.os_creds, self.net_config.network_settings)
556         self.assertEqual(self.net_config.network_settings.name,
557                          self.network.name)
558         self.assertTrue(validate_network(
559             self.neutron, self.net_config.network_settings.name, True))
560
561         subnet_setting = self.net_config.network_settings.subnet_settings[0]
562         self.subnet = neutron_utils.create_subnet(
563             self.neutron, subnet_setting, self.os_creds, self.network)
564         validate_subnet(self.neutron,
565                         subnet_setting.name,
566                         subnet_setting.cidr, True)
567
568         with self.assertRaises(Exception):
569             self.port = neutron_utils.create_port(
570                 self.neutron, self.os_creds,
571                 PortSettings(
572                     name=self.port_name,
573                     network_name=self.net_config.network_settings.name,
574                     ip_addrs=[{
575                         'subnet_name': subnet_setting.name,
576                         'ip': 'foo'}]))
577
578     def test_create_port_invalid_ip_to_subnet(self):
579         """
580         Tests the neutron_utils.create_port() function for an Exception when
581         the IP value is None
582         """
583         self.network = neutron_utils.create_network(
584             self.neutron, self.os_creds, self.net_config.network_settings)
585         self.assertEqual(self.net_config.network_settings.name,
586                          self.network.name)
587         self.assertTrue(validate_network(
588             self.neutron, self.net_config.network_settings.name, True))
589
590         subnet_setting = self.net_config.network_settings.subnet_settings[0]
591         self.subnet = neutron_utils.create_subnet(
592             self.neutron, subnet_setting, self.os_creds, self.network)
593         validate_subnet(
594             self.neutron, subnet_setting.name, subnet_setting.cidr, True)
595
596         with self.assertRaises(Exception):
597             self.port = neutron_utils.create_port(
598                 self.neutron, self.os_creds,
599                 PortSettings(
600                     name=self.port_name,
601                     network_name=self.net_config.network_settings.name,
602                     ip_addrs=[{
603                         'subnet_name': subnet_setting.name,
604                         'ip': '10.197.123.100'}]))
605
606
607 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
608     """
609     Test for creating security groups via neutron_utils.py
610     """
611
612     def setUp(self):
613         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
614         self.sec_grp_name = guid + 'name'
615
616         self.security_groups = list()
617         self.security_group_rules = list()
618         self.neutron = neutron_utils.neutron_client(self.os_creds)
619         self.keystone = keystone_utils.keystone_client(self.os_creds)
620
621     def tearDown(self):
622         """
623         Cleans the remote OpenStack objects
624         """
625         for rule in self.security_group_rules:
626             neutron_utils.delete_security_group_rule(self.neutron, rule)
627
628         for security_group in self.security_groups:
629             try:
630                 neutron_utils.delete_security_group(self.neutron,
631                                                     security_group)
632             except:
633                 pass
634
635     def test_create_delete_simple_sec_grp(self):
636         """
637         Tests the neutron_utils.create_security_group() function
638         """
639         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
640         security_group = neutron_utils.create_security_group(self.neutron,
641                                                              self.keystone,
642                                                              sec_grp_settings)
643
644         self.assertTrue(sec_grp_settings.name, security_group.name)
645
646         sec_grp_get = neutron_utils.get_security_group(self.neutron,
647                                                        sec_grp_settings.name)
648         self.assertIsNotNone(sec_grp_get)
649         self.assertTrue(validation_utils.objects_equivalent(
650             security_group, sec_grp_get))
651
652         neutron_utils.delete_security_group(self.neutron, security_group)
653         sec_grp_get = neutron_utils.get_security_group(self.neutron,
654                                                        sec_grp_settings.name)
655         self.assertIsNone(sec_grp_get)
656
657     def test_create_sec_grp_no_name(self):
658         """
659         Tests the SecurityGroupSettings constructor and
660         neutron_utils.create_security_group() function to ensure that
661         attempting to create a security group without a name will raise an
662         exception
663         """
664         with self.assertRaises(Exception):
665             sec_grp_settings = SecurityGroupSettings()
666             self.security_groups.append(
667                 neutron_utils.create_security_group(self.neutron,
668                                                     self.keystone,
669                                                     sec_grp_settings))
670
671     def test_create_sec_grp_no_rules(self):
672         """
673         Tests the neutron_utils.create_security_group() function
674         """
675         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
676                                                  description='hello group')
677         self.security_groups.append(
678             neutron_utils.create_security_group(self.neutron, self.keystone,
679                                                 sec_grp_settings))
680
681         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
682         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
683
684         sec_grp_get = neutron_utils.get_security_group(self.neutron,
685                                                        sec_grp_settings.name)
686         self.assertIsNotNone(sec_grp_get)
687         self.assertEqual(self.security_groups[0], sec_grp_get)
688
689     def test_create_sec_grp_one_rule(self):
690         """
691         Tests the neutron_utils.create_security_group() function
692         """
693
694         sec_grp_rule_settings = SecurityGroupRuleSettings(
695             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
696         sec_grp_settings = SecurityGroupSettings(
697             name=self.sec_grp_name, description='hello group',
698             rule_settings=[sec_grp_rule_settings])
699
700         self.security_groups.append(
701             neutron_utils.create_security_group(self.neutron, self.keystone,
702                                                 sec_grp_settings))
703         free_rules = neutron_utils.get_rules_by_security_group(
704             self.neutron, self.security_groups[0])
705         for free_rule in free_rules:
706             self.security_group_rules.append(free_rule)
707
708         self.security_group_rules.append(
709             neutron_utils.create_security_group_rule(
710                 self.neutron, sec_grp_settings.rule_settings[0]))
711
712         # Refresh object so it is populated with the newly added rule
713         security_group = neutron_utils.get_security_group(
714             self.neutron, sec_grp_settings.name)
715
716         rules = neutron_utils.get_rules_by_security_group(self.neutron,
717                                                           security_group)
718
719         self.assertTrue(
720             validation_utils.objects_equivalent(
721                  self.security_group_rules, rules))
722
723         self.assertTrue(sec_grp_settings.name, security_group.name)
724
725         sec_grp_get = neutron_utils.get_security_group(self.neutron,
726                                                        sec_grp_settings.name)
727         self.assertIsNotNone(sec_grp_get)
728         self.assertEqual(security_group, sec_grp_get)
729
730     def test_get_sec_grp_by_id(self):
731         """
732         Tests the neutron_utils.create_security_group() function
733         """
734
735         self.security_groups.append(neutron_utils.create_security_group(
736             self.neutron, self.keystone,
737             SecurityGroupSettings(name=self.sec_grp_name + '-1',
738                                   description='hello group')))
739         self.security_groups.append(neutron_utils.create_security_group(
740             self.neutron, self.keystone,
741             SecurityGroupSettings(name=self.sec_grp_name + '-2',
742                                   description='hello group')))
743
744         sec_grp_1b = neutron_utils.get_security_group_by_id(
745             self.neutron, self.security_groups[0].id)
746         sec_grp_2b = neutron_utils.get_security_group_by_id(
747             self.neutron, self.security_groups[1].id)
748
749         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
750         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
751
752
753 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
754     """
755     Test basic nova keypair functionality
756     """
757
758     def setUp(self):
759         """
760         Instantiates the CreateImage object that is responsible for downloading
761         and creating an OS image file within OpenStack
762         """
763         self.neutron = neutron_utils.neutron_client(self.os_creds)
764         self.floating_ip = None
765
766     def tearDown(self):
767         """
768         Cleans the image and downloaded image file
769         """
770         if self.floating_ip:
771             neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
772
773     def test_floating_ips(self):
774         """
775         Tests the creation of a floating IP
776         :return:
777         """
778         initial_fips = neutron_utils.get_floating_ips(self.neutron)
779
780         self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
781                                                             self.ext_net_name)
782         all_fips = neutron_utils.get_floating_ips(self.neutron)
783         self.assertEqual(len(initial_fips) + 1, len(all_fips))
784         returned = neutron_utils.get_floating_ip(self.neutron,
785                                                  self.floating_ip)
786         self.assertEqual(self.floating_ip.id, returned.id)
787         self.assertEqual(self.floating_ip.ip, returned.ip)
788
789
790 """
791 Validation routines
792 """
793
794
795 def validate_network(neutron, name, exists):
796     """
797     Returns true if a network for a given name DOES NOT exist if the exists
798     parameter is false conversely true. Returns false if a network for a given
799     name DOES exist if the exists parameter is true conversely false.
800     :param neutron: The neutron client
801     :param name: The expected network name
802     :param exists: Whether or not the network name should exist or not
803     :return: True/False
804     """
805     network = neutron_utils.get_network(neutron, name)
806     if exists and network:
807         return True
808     if not exists and not network:
809         return True
810     return False
811
812
813 def validate_subnet(neutron, name, cidr, exists):
814     """
815     Returns true if a subnet for a given name DOES NOT exist if the exists
816     parameter is false conversely true. Returns false if a subnet for a given
817     name DOES exist if the exists parameter is true conversely false.
818     :param neutron: The neutron client
819     :param name: The expected subnet name
820     :param cidr: The expected CIDR value
821     :param exists: Whether or not the network name should exist or not
822     :return: True/False
823     """
824     subnet = neutron_utils.get_subnet_by_name(neutron, name)
825     if exists and subnet:
826         return subnet.cidr == cidr
827     if not exists and not subnet:
828         return True
829     return False
830
831
832 def validate_router(neutron, name, exists):
833     """
834     Returns true if a router for a given name DOES NOT exist if the exists
835     parameter is false conversely true. Returns false if a router for a given
836     name DOES exist if the exists parameter is true conversely false.
837     :param neutron: The neutron client
838     :param name: The expected router name
839     :param exists: Whether or not the network name should exist or not
840     :return: True/False
841     """
842     router = neutron_utils.get_router_by_name(neutron, name)
843     if exists and router:
844         return True
845     return False
846
847
848 def validate_interface_router(interface_router, router, subnet):
849     """
850     Returns true if the router ID & subnet ID have been properly included into
851     the interface router object
852     :param interface_router: the SNAPS-OO InterfaceRouter domain object
853     :param router: to validate against the interface_router
854     :param subnet: to validate against the interface_router
855     :return: True if both IDs match else False
856     """
857     subnet_id = interface_router.subnet_id
858     router_id = interface_router.port_id
859
860     return subnet.id == subnet_id and router.id == router_id
861
862
863 def validate_port(neutron, port_obj, this_port_name):
864     """
865     Returns true if a port for a given name DOES NOT exist if the exists
866     parameter is false conversely true. Returns false if a port for a given
867     name DOES exist if the exists parameter is true conversely false.
868     :param neutron: The neutron client
869     :param port_obj: The port object to lookup
870     :param this_port_name: The expected router name
871     :return: True/False
872     """
873     os_ports = neutron.list_ports()
874     for os_port, os_port_insts in os_ports.items():
875         for os_inst in os_port_insts:
876             if os_inst['id'] == port_obj.id:
877                 return os_inst['name'] == this_port_name
878     return False