Created domain class for routers.
[snaps.git] / snaps / openstack / utils / tests / neutron_utils_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 import uuid
16
17 from snaps.openstack import create_router
18 from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
19     PortSettings
20 from snaps.openstack.create_security_group import SecurityGroupSettings, \
21     SecurityGroupRuleSettings, Direction
22 from snaps.openstack.tests import openstack_tests
23 from snaps.openstack.tests import validation_utils
24 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
25 from snaps.openstack.utils import keystone_utils
26 from snaps.openstack.utils import neutron_utils
27
28 __author__ = 'spisarski'
29
30 ip_1 = '10.55.1.100'
31 ip_2 = '10.55.1.200'
32
33
34 class NeutronSmokeTests(OSComponentTestCase):
35     """
36     Tests to ensure that the neutron client can communicate with the cloud
37     """
38
39     def test_neutron_connect_success(self):
40         """
41         Tests to ensure that the proper credentials can connect.
42         """
43         neutron = neutron_utils.neutron_client(self.os_creds)
44
45         networks = neutron.list_networks()
46
47         found = False
48         networks = networks.get('networks')
49         for network in networks:
50             if network.get('name') == self.ext_net_name:
51                 found = True
52         self.assertTrue(found)
53
54     def test_neutron_connect_fail(self):
55         """
56         Tests to ensure that the improper credentials cannot connect.
57         """
58         from snaps.openstack.os_credentials import OSCreds
59
60         with self.assertRaises(Exception):
61             neutron = neutron_utils.neutron_client(
62                 OSCreds(username='user', password='pass', auth_url='url',
63                         project_name='project'))
64             neutron.list_networks()
65
66     def test_retrieve_ext_network_name(self):
67         """
68         Tests the neutron_utils.get_external_network_names to ensure the
69         configured self.ext_net_name is contained within the returned list
70         :return:
71         """
72         neutron = neutron_utils.neutron_client(self.os_creds)
73         ext_networks = neutron_utils.get_external_networks(neutron)
74         found = False
75         for network in ext_networks:
76             if network['network']['name'] == self.ext_net_name:
77                 found = True
78                 break
79         self.assertTrue(found)
80
81
82 class NeutronUtilsNetworkTests(OSComponentTestCase):
83     """
84     Test for creating networks via neutron_utils.py
85     """
86
87     def setUp(self):
88         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
89         self.port_name = str(guid) + '-port'
90         self.neutron = neutron_utils.neutron_client(self.os_creds)
91         self.network = None
92         self.net_config = openstack_tests.get_pub_net_config(
93             net_name=guid + '-pub-net')
94
95     def tearDown(self):
96         """
97         Cleans the remote OpenStack objects
98         """
99         if self.network:
100             neutron_utils.delete_network(self.neutron, self.network)
101             validate_network(self.neutron, self.network['network']['name'],
102                              False)
103
104     def test_create_network(self):
105         """
106         Tests the neutron_utils.create_neutron_net() function
107         """
108         self.network = neutron_utils.create_network(
109             self.neutron, self.os_creds, self.net_config.network_settings)
110         self.assertEqual(self.net_config.network_settings.name,
111                          self.network['network']['name'])
112         self.assertTrue(validate_network(self.neutron,
113                                          self.net_config.network_settings.name,
114                                          True))
115
116     def test_create_network_empty_name(self):
117         """
118         Tests the neutron_utils.create_neutron_net() function with an empty
119         network name
120         """
121         with self.assertRaises(Exception):
122             self.network = neutron_utils.create_network(
123                 self.neutron, self.os_creds,
124                 network_settings=NetworkSettings(name=''))
125
126     def test_create_network_null_name(self):
127         """
128         Tests the neutron_utils.create_neutron_net() function when the network
129         name is None
130         """
131         with self.assertRaises(Exception):
132             self.network = neutron_utils.create_network(
133                 self.neutron, self.os_creds,
134                 network_settings=NetworkSettings())
135
136
137 class NeutronUtilsSubnetTests(OSComponentTestCase):
138     """
139     Test for creating networks with subnets via neutron_utils.py
140     """
141
142     def setUp(self):
143         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
144         self.port_name = str(guid) + '-port'
145         self.neutron = neutron_utils.neutron_client(self.os_creds)
146         self.network = None
147         self.subnet = None
148         self.net_config = openstack_tests.get_pub_net_config(
149             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
150             external_net=self.ext_net_name)
151
152     def tearDown(self):
153         """
154         Cleans the remote OpenStack objects
155         """
156         if self.subnet:
157             neutron_utils.delete_subnet(self.neutron, self.subnet)
158             validate_subnet(self.neutron, self.subnet.get('name'),
159                             self.net_config.network_settings.subnet_settings[
160                                 0].cidr, False)
161
162         if self.network:
163             neutron_utils.delete_network(self.neutron, self.network)
164             validate_network(self.neutron, self.network['network']['name'],
165                              False)
166
167     def test_create_subnet(self):
168         """
169         Tests the neutron_utils.create_neutron_net() function
170         """
171         self.network = neutron_utils.create_network(
172             self.neutron, self.os_creds, self.net_config.network_settings)
173         self.assertEqual(self.net_config.network_settings.name,
174                          self.network['network']['name'])
175         self.assertTrue(validate_network(self.neutron,
176                                          self.net_config.network_settings.name,
177                                          True))
178
179         subnet_setting = self.net_config.network_settings.subnet_settings[0]
180         self.subnet = neutron_utils.create_subnet(
181             self.neutron, subnet_setting,
182             self.os_creds, network=self.network)
183         validate_subnet(
184             self.neutron,
185             subnet_setting.name,
186             subnet_setting.cidr, True)
187
188     def test_create_subnet_null_name(self):
189         """
190         Tests the neutron_utils.create_neutron_subnet() function for an
191         Exception when the subnet name is None
192         """
193         self.network = neutron_utils.create_network(
194             self.neutron, self.os_creds, self.net_config.network_settings)
195         self.assertEqual(self.net_config.network_settings.name,
196                          self.network['network']['name'])
197         self.assertTrue(validate_network(self.neutron,
198                                          self.net_config.network_settings.name,
199                                          True))
200
201         with self.assertRaises(Exception):
202             SubnetSettings(cidr=self.net_config.subnet_cidr)
203
204     def test_create_subnet_empty_name(self):
205         """
206         Tests the neutron_utils.create_neutron_net() function with an empty
207         name
208         """
209         self.network = neutron_utils.create_network(
210             self.neutron, self.os_creds, self.net_config.network_settings)
211         self.assertEqual(self.net_config.network_settings.name,
212                          self.network['network']['name'])
213         self.assertTrue(validate_network(self.neutron,
214                                          self.net_config.network_settings.name,
215                                          True))
216
217         subnet_setting = self.net_config.network_settings.subnet_settings[0]
218         neutron_utils.create_subnet(
219             self.neutron, subnet_setting,
220             self.os_creds, network=self.network)
221         validate_subnet(self.neutron, '',
222                         subnet_setting.cidr, True)
223
224     def test_create_subnet_null_cidr(self):
225         """
226         Tests the neutron_utils.create_neutron_subnet() function for an
227         Exception when the subnet CIDR value is None
228         """
229         self.network = neutron_utils.create_network(
230             self.neutron, self.os_creds, self.net_config.network_settings)
231         self.assertEqual(self.net_config.network_settings.name,
232                          self.network['network']['name'])
233         self.assertTrue(validate_network(self.neutron,
234                                          self.net_config.network_settings.name,
235                                          True))
236
237         with self.assertRaises(Exception):
238             sub_sets = SubnetSettings(cidr=None,
239                                       name=self.net_config.subnet_name)
240             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
241                                         network=self.network)
242
243     def test_create_subnet_empty_cidr(self):
244         """
245         Tests the neutron_utils.create_neutron_subnet() function for an
246         Exception when the subnet CIDR value is empty
247         """
248         self.network = neutron_utils.create_network(
249             self.neutron, self.os_creds, self.net_config.network_settings)
250         self.assertEqual(self.net_config.network_settings.name,
251                          self.network['network']['name'])
252         self.assertTrue(validate_network(self.neutron,
253                                          self.net_config.network_settings.name,
254                                          True))
255
256         with self.assertRaises(Exception):
257             sub_sets = SubnetSettings(cidr='',
258                                       name=self.net_config.subnet_name)
259             neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
260                                         network=self.network)
261
262
263 class NeutronUtilsRouterTests(OSComponentTestCase):
264     """
265     Test for creating routers via neutron_utils.py
266     """
267
268     def setUp(self):
269         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
270         self.port_name = str(guid) + '-port'
271         self.neutron = neutron_utils.neutron_client(self.os_creds)
272         self.network = None
273         self.subnet = None
274         self.port = None
275         self.router = None
276         self.interface_router = None
277         self.net_config = openstack_tests.get_pub_net_config(
278             net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
279             router_name=guid + '-pub-router', external_net=self.ext_net_name)
280
281     def tearDown(self):
282         """
283         Cleans the remote OpenStack objects
284         """
285         if self.interface_router:
286             neutron_utils.remove_interface_router(self.neutron, self.router,
287                                                   self.subnet)
288
289         if self.router:
290             neutron_utils.delete_router(self.neutron, self.router)
291             validate_router(self.neutron, self.router.name, False)
292
293         if self.port:
294             neutron_utils.delete_port(self.neutron, self.port)
295
296         if self.subnet:
297             neutron_utils.delete_subnet(self.neutron, self.subnet)
298             validate_subnet(self.neutron, self.subnet.get('name'),
299                             self.net_config.network_settings.subnet_settings[
300                                 0].cidr, False)
301
302         if self.network:
303             neutron_utils.delete_network(self.neutron, self.network)
304             validate_network(self.neutron, self.network['network']['name'],
305                              False)
306
307     def test_create_router_simple(self):
308         """
309         Tests the neutron_utils.create_neutron_net() function when an external
310         gateway is requested
311         """
312         self.router = neutron_utils.create_router(
313             self.neutron, self.os_creds, self.net_config.router_settings)
314         validate_router(self.neutron, self.net_config.router_settings.name,
315                         True)
316
317     def test_create_router_with_public_interface(self):
318         """
319         Tests the neutron_utils.create_neutron_net() function when an external
320         gateway is requested
321         """
322         subnet_setting = self.net_config.network_settings.subnet_settings[0]
323         self.net_config = openstack_tests.OSNetworkConfig(
324             self.net_config.network_settings.name,
325             subnet_setting.name,
326             subnet_setting.cidr,
327             self.net_config.router_settings.name,
328             self.ext_net_name)
329         self.router = neutron_utils.create_router(
330             self.neutron, self.os_creds, self.net_config.router_settings)
331         validate_router(self.neutron, self.net_config.router_settings.name,
332                         True)
333         # TODO - Add validation that the router gatway has been set
334
335     def test_create_router_empty_name(self):
336         """
337         Tests the neutron_utils.create_neutron_net() function
338         """
339         with self.assertRaises(Exception):
340             this_router_settings = create_router.RouterSettings(name='')
341             self.router = neutron_utils.create_router(self.neutron,
342                                                       self.os_creds,
343                                                       this_router_settings)
344
345     def test_create_router_null_name(self):
346         """
347         Tests the neutron_utils.create_neutron_subnet() function when the
348         subnet CIDR value is None
349         """
350         with self.assertRaises(Exception):
351             this_router_settings = create_router.RouterSettings()
352             self.router = neutron_utils.create_router(self.neutron,
353                                                       self.os_creds,
354                                                       this_router_settings)
355             validate_router(self.neutron, None, True)
356
357     def test_add_interface_router(self):
358         """
359         Tests the neutron_utils.add_interface_router() function
360         """
361         self.network = neutron_utils.create_network(
362             self.neutron, self.os_creds, self.net_config.network_settings)
363         self.assertEqual(self.net_config.network_settings.name,
364                          self.network['network']['name'])
365         self.assertTrue(validate_network(self.neutron,
366                                          self.net_config.network_settings.name,
367                                          True))
368
369         subnet_setting = self.net_config.network_settings.subnet_settings[0]
370         self.subnet = neutron_utils.create_subnet(
371             self.neutron, subnet_setting,
372             self.os_creds, self.network)
373         validate_subnet(
374             self.neutron,
375             subnet_setting.name,
376             subnet_setting.cidr, True)
377
378         self.router = neutron_utils.create_router(
379             self.neutron, self.os_creds, self.net_config.router_settings)
380         validate_router(self.neutron, self.net_config.router_settings.name,
381                         True)
382
383         self.interface_router = neutron_utils.add_interface_router(
384             self.neutron, self.router, self.subnet)
385         validate_interface_router(self.interface_router, self.router,
386                                   self.subnet)
387
388     def test_add_interface_router_null_router(self):
389         """
390         Tests the neutron_utils.add_interface_router() function for an
391         Exception when the router value is None
392         """
393         self.network = neutron_utils.create_network(
394             self.neutron, self.os_creds, self.net_config.network_settings)
395         self.assertEqual(self.net_config.network_settings.name,
396                          self.network['network']['name'])
397         self.assertTrue(validate_network(self.neutron,
398                                          self.net_config.network_settings.name,
399                                          True))
400
401         subnet_setting = self.net_config.network_settings.subnet_settings[0]
402         self.subnet = neutron_utils.create_subnet(
403             self.neutron, subnet_setting,
404             self.os_creds, self.network)
405         validate_subnet(
406             self.neutron,
407             subnet_setting.name,
408             subnet_setting.cidr, True)
409
410         with self.assertRaises(Exception):
411             self.interface_router = neutron_utils.add_interface_router(
412                 self.neutron, self.router, self.subnet)
413
414     def test_add_interface_router_null_subnet(self):
415         """
416         Tests the neutron_utils.add_interface_router() function for an
417         Exception when the subnet value is None
418         """
419         self.network = neutron_utils.create_network(
420             self.neutron, self.os_creds, self.net_config.network_settings)
421         self.assertEqual(self.net_config.network_settings.name,
422                          self.network['network']['name'])
423         self.assertTrue(validate_network(self.neutron,
424                                          self.net_config.network_settings.name,
425                                          True))
426
427         self.router = neutron_utils.create_router(
428             self.neutron, self.os_creds, self.net_config.router_settings)
429         validate_router(self.neutron, self.net_config.router_settings.name,
430                         True)
431
432         with self.assertRaises(Exception):
433             self.interface_router = neutron_utils.add_interface_router(
434                 self.neutron, self.router, self.subnet)
435
436     def test_create_port(self):
437         """
438         Tests the neutron_utils.create_port() function
439         """
440         self.network = neutron_utils.create_network(
441             self.neutron, self.os_creds, self.net_config.network_settings)
442         self.assertEqual(self.net_config.network_settings.name,
443                          self.network['network']['name'])
444         self.assertTrue(validate_network(self.neutron,
445                                          self.net_config.network_settings.name,
446                                          True))
447
448         subnet_setting = self.net_config.network_settings.subnet_settings[0]
449         self.subnet = neutron_utils.create_subnet(
450             self.neutron, subnet_setting, self.os_creds, self.network)
451         validate_subnet(self.neutron, subnet_setting.name,
452                         subnet_setting.cidr, True)
453
454         self.port = neutron_utils.create_port(
455             self.neutron, self.os_creds, PortSettings(
456                 name=self.port_name,
457                 ip_addrs=[{
458                     'subnet_name': subnet_setting.name,
459                     'ip': ip_1}],
460                 network_name=self.net_config.network_settings.name))
461         validate_port(self.neutron, self.port, self.port_name)
462
463     def test_create_port_empty_name(self):
464         """
465         Tests the neutron_utils.create_port() function
466         """
467         self.network = neutron_utils.create_network(
468             self.neutron, self.os_creds, self.net_config.network_settings)
469         self.assertEqual(self.net_config.network_settings.name,
470                          self.network['network']['name'])
471         self.assertTrue(validate_network(self.neutron,
472                                          self.net_config.network_settings.name,
473                                          True))
474
475         subnet_setting = self.net_config.network_settings.subnet_settings[0]
476         self.subnet = neutron_utils.create_subnet(
477             self.neutron, subnet_setting, self.os_creds, self.network)
478         validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
479                         True)
480
481         self.port = neutron_utils.create_port(
482             self.neutron, self.os_creds, PortSettings(
483                 name=self.port_name,
484                 network_name=self.net_config.network_settings.name,
485                 ip_addrs=[{
486                     'subnet_name': subnet_setting.name,
487                     'ip': ip_1}]))
488         validate_port(self.neutron, self.port, self.port_name)
489
490     def test_create_port_null_name(self):
491         """
492         Tests the neutron_utils.create_port() function for an Exception when
493         the port name value is None
494         """
495         self.network = neutron_utils.create_network(
496             self.neutron, self.os_creds, self.net_config.network_settings)
497         self.assertEqual(self.net_config.network_settings.name,
498                          self.network['network']['name'])
499         self.assertTrue(validate_network(self.neutron,
500                                          self.net_config.network_settings.name,
501                                          True))
502
503         subnet_setting = self.net_config.network_settings.subnet_settings[0]
504         self.subnet = neutron_utils.create_subnet(
505             self.neutron, subnet_setting,
506             self.os_creds, self.network)
507         validate_subnet(
508             self.neutron,
509             subnet_setting.name,
510             subnet_setting.cidr, True)
511
512         with self.assertRaises(Exception):
513             self.port = neutron_utils.create_port(
514                 self.neutron, self.os_creds,
515                 PortSettings(
516                     network_name=self.net_config.network_settings.name,
517                     ip_addrs=[{
518                         'subnet_name': subnet_setting.name,
519                         'ip': ip_1}]))
520
521     def test_create_port_null_network_object(self):
522         """
523         Tests the neutron_utils.create_port() function for an Exception when
524         the network object is None
525         """
526         with self.assertRaises(Exception):
527             self.port = neutron_utils.create_port(
528                 self.neutron, self.os_creds,
529                 PortSettings(
530                     name=self.port_name,
531                     network_name=self.net_config.network_settings.name,
532                     ip_addrs=[{
533                         'subnet_name':
534                             self.net_config.network_settings.subnet_settings[
535                                 0].name,
536                         'ip': ip_1}]))
537
538     def test_create_port_null_ip(self):
539         """
540         Tests the neutron_utils.create_port() function for an Exception when
541         the IP value is None
542         """
543         self.network = neutron_utils.create_network(
544             self.neutron, self.os_creds, self.net_config.network_settings)
545         self.assertEqual(self.net_config.network_settings.name,
546                          self.network['network']['name'])
547         self.assertTrue(validate_network(self.neutron,
548                                          self.net_config.network_settings.name,
549                                          True))
550
551         subnet_setting = self.net_config.network_settings.subnet_settings[0]
552         self.subnet = neutron_utils.create_subnet(
553             self.neutron, subnet_setting,
554             self.os_creds, self.network)
555         validate_subnet(
556             self.neutron,
557             subnet_setting.name,
558             subnet_setting.cidr, True)
559
560         with self.assertRaises(Exception):
561             self.port = neutron_utils.create_port(
562                 self.neutron, self.os_creds,
563                 PortSettings(
564                     name=self.port_name,
565                     network_name=self.net_config.network_settings.name,
566                     ip_addrs=[{
567                         'subnet_name': subnet_setting.name,
568                         'ip': None}]))
569
570     def test_create_port_invalid_ip(self):
571         """
572         Tests the neutron_utils.create_port() function for an Exception when
573         the IP value is None
574         """
575         self.network = neutron_utils.create_network(
576             self.neutron, self.os_creds, self.net_config.network_settings)
577         self.assertEqual(self.net_config.network_settings.name,
578                          self.network['network']['name'])
579         self.assertTrue(validate_network(self.neutron,
580                                          self.net_config.network_settings.name,
581                                          True))
582
583         subnet_setting = self.net_config.network_settings.subnet_settings[0]
584         self.subnet = neutron_utils.create_subnet(
585             self.neutron,
586             subnet_setting,
587             self.os_creds, self.network)
588         validate_subnet(self.neutron,
589                         subnet_setting.name,
590                         subnet_setting.cidr, True)
591
592         with self.assertRaises(Exception):
593             self.port = neutron_utils.create_port(
594                 self.neutron, self.os_creds,
595                 PortSettings(
596                     name=self.port_name,
597                     network_name=self.net_config.network_settings.name,
598                     ip_addrs=[{
599                         'subnet_name': subnet_setting.name,
600                         'ip': 'foo'}]))
601
602     def test_create_port_invalid_ip_to_subnet(self):
603         """
604         Tests the neutron_utils.create_port() function for an Exception when
605         the IP value is None
606         """
607         self.network = neutron_utils.create_network(
608             self.neutron, self.os_creds, self.net_config.network_settings)
609         self.assertEqual(self.net_config.network_settings.name,
610                          self.network['network']['name'])
611         self.assertTrue(validate_network(self.neutron,
612                                          self.net_config.network_settings.name,
613                                          True))
614
615         subnet_setting = self.net_config.network_settings.subnet_settings[0]
616         self.subnet = neutron_utils.create_subnet(
617             self.neutron,
618             subnet_setting,
619             self.os_creds, self.network)
620         validate_subnet(self.neutron,
621                         subnet_setting.name,
622                         subnet_setting.cidr, True)
623
624         with self.assertRaises(Exception):
625             self.port = neutron_utils.create_port(
626                 self.neutron, self.os_creds,
627                 PortSettings(
628                     name=self.port_name,
629                     network_name=self.net_config.network_settings.name,
630                     ip_addrs=[{
631                         'subnet_name': subnet_setting.name,
632                         'ip': '10.197.123.100'}]))
633
634
635 class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
636     """
637     Test for creating security groups via neutron_utils.py
638     """
639
640     def setUp(self):
641         guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
642         self.sec_grp_name = guid + 'name'
643
644         self.security_groups = list()
645         self.security_group_rules = list()
646         self.neutron = neutron_utils.neutron_client(self.os_creds)
647         self.keystone = keystone_utils.keystone_client(self.os_creds)
648
649     def tearDown(self):
650         """
651         Cleans the remote OpenStack objects
652         """
653         for rule in self.security_group_rules:
654             neutron_utils.delete_security_group_rule(self.neutron, rule)
655
656         for security_group in self.security_groups:
657             try:
658                 neutron_utils.delete_security_group(self.neutron,
659                                                     security_group)
660             except:
661                 pass
662
663     def test_create_delete_simple_sec_grp(self):
664         """
665         Tests the neutron_utils.create_security_group() function
666         """
667         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
668         security_group = neutron_utils.create_security_group(self.neutron,
669                                                              self.keystone,
670                                                              sec_grp_settings)
671
672         self.assertTrue(sec_grp_settings.name, security_group.name)
673
674         sec_grp_get = neutron_utils.get_security_group(self.neutron,
675                                                        sec_grp_settings.name)
676         self.assertIsNotNone(sec_grp_get)
677         self.assertTrue(validation_utils.objects_equivalent(
678             security_group, sec_grp_get))
679
680         neutron_utils.delete_security_group(self.neutron, security_group)
681         sec_grp_get = neutron_utils.get_security_group(self.neutron,
682                                                        sec_grp_settings.name)
683         self.assertIsNone(sec_grp_get)
684
685     def test_create_sec_grp_no_name(self):
686         """
687         Tests the SecurityGroupSettings constructor and
688         neutron_utils.create_security_group() function to ensure that
689         attempting to create a security group without a name will raise an
690         exception
691         """
692         with self.assertRaises(Exception):
693             sec_grp_settings = SecurityGroupSettings()
694             self.security_groups.append(
695                 neutron_utils.create_security_group(self.neutron,
696                                                     self.keystone,
697                                                     sec_grp_settings))
698
699     def test_create_sec_grp_no_rules(self):
700         """
701         Tests the neutron_utils.create_security_group() function
702         """
703         sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
704                                                  description='hello group')
705         self.security_groups.append(
706             neutron_utils.create_security_group(self.neutron, self.keystone,
707                                                 sec_grp_settings))
708
709         self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
710         self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
711
712         sec_grp_get = neutron_utils.get_security_group(self.neutron,
713                                                        sec_grp_settings.name)
714         self.assertIsNotNone(sec_grp_get)
715         self.assertEqual(self.security_groups[0], sec_grp_get)
716
717     def test_create_sec_grp_one_rule(self):
718         """
719         Tests the neutron_utils.create_security_group() function
720         """
721
722         sec_grp_rule_settings = SecurityGroupRuleSettings(
723             sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
724         sec_grp_settings = SecurityGroupSettings(
725             name=self.sec_grp_name, description='hello group',
726             rule_settings=[sec_grp_rule_settings])
727
728         self.security_groups.append(
729             neutron_utils.create_security_group(self.neutron, self.keystone,
730                                                 sec_grp_settings))
731         free_rules = neutron_utils.get_rules_by_security_group(
732             self.neutron, self.security_groups[0])
733         for free_rule in free_rules:
734             self.security_group_rules.append(free_rule)
735
736         self.security_group_rules.append(
737             neutron_utils.create_security_group_rule(
738                 self.neutron, sec_grp_settings.rule_settings[0]))
739
740         # Refresh object so it is populated with the newly added rule
741         security_group = neutron_utils.get_security_group(
742             self.neutron, sec_grp_settings.name)
743
744         rules = neutron_utils.get_rules_by_security_group(self.neutron,
745                                                           security_group)
746
747         self.assertTrue(
748             validation_utils.objects_equivalent(
749                  self.security_group_rules, rules))
750
751         self.assertTrue(sec_grp_settings.name, security_group.name)
752
753         sec_grp_get = neutron_utils.get_security_group(self.neutron,
754                                                        sec_grp_settings.name)
755         self.assertIsNotNone(sec_grp_get)
756         self.assertEqual(security_group, sec_grp_get)
757
758     def test_get_sec_grp_by_id(self):
759         """
760         Tests the neutron_utils.create_security_group() function
761         """
762
763         self.security_groups.append(neutron_utils.create_security_group(
764             self.neutron, self.keystone,
765             SecurityGroupSettings(name=self.sec_grp_name + '-1',
766                                   description='hello group')))
767         self.security_groups.append(neutron_utils.create_security_group(
768             self.neutron, self.keystone,
769             SecurityGroupSettings(name=self.sec_grp_name + '-2',
770                                   description='hello group')))
771
772         sec_grp_1b = neutron_utils.get_security_group_by_id(
773             self.neutron, self.security_groups[0].id)
774         sec_grp_2b = neutron_utils.get_security_group_by_id(
775             self.neutron, self.security_groups[1].id)
776
777         self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
778         self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
779
780
781 class NeutronUtilsFloatingIpTests(OSComponentTestCase):
782     """
783     Test basic nova keypair functionality
784     """
785
786     def setUp(self):
787         """
788         Instantiates the CreateImage object that is responsible for downloading
789         and creating an OS image file within OpenStack
790         """
791         self.neutron = neutron_utils.neutron_client(self.os_creds)
792         self.floating_ip = None
793
794     def tearDown(self):
795         """
796         Cleans the image and downloaded image file
797         """
798         if self.floating_ip:
799             neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
800
801     def test_floating_ips(self):
802         """
803         Tests the creation of a floating IP
804         :return:
805         """
806         initial_fips = neutron_utils.get_floating_ips(self.neutron)
807
808         self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
809                                                             self.ext_net_name)
810         all_fips = neutron_utils.get_floating_ips(self.neutron)
811         self.assertEqual(len(initial_fips) + 1, len(all_fips))
812         returned = neutron_utils.get_floating_ip(self.neutron,
813                                                  self.floating_ip)
814         self.assertEqual(self.floating_ip.id, returned.id)
815         self.assertEqual(self.floating_ip.ip, returned.ip)
816
817
818 """
819 Validation routines
820 """
821
822
823 def validate_network(neutron, name, exists):
824     """
825     Returns true if a network for a given name DOES NOT exist if the exists
826     parameter is false conversely true. Returns false if a network for a given
827     name DOES exist if the exists parameter is true conversely false.
828     :param neutron: The neutron client
829     :param name: The expected network name
830     :param exists: Whether or not the network name should exist or not
831     :return: True/False
832     """
833     network = neutron_utils.get_network(neutron, name)
834     if exists and network:
835         return True
836     if not exists and not network:
837         return True
838     return False
839
840
841 def validate_subnet(neutron, name, cidr, exists):
842     """
843     Returns true if a subnet for a given name DOES NOT exist if the exists
844     parameter is false conversely true. Returns false if a subnet for a given
845     name DOES exist if the exists parameter is true conversely false.
846     :param neutron: The neutron client
847     :param name: The expected subnet name
848     :param cidr: The expected CIDR value
849     :param exists: Whether or not the network name should exist or not
850     :return: True/False
851     """
852     subnet = neutron_utils.get_subnet_by_name(neutron, name)
853     if exists and subnet:
854         return subnet.get('cidr') == cidr
855     if not exists and not subnet:
856         return True
857     return False
858
859
860 def validate_router(neutron, name, exists):
861     """
862     Returns true if a router for a given name DOES NOT exist if the exists
863     parameter is false conversely true. Returns false if a router for a given
864     name DOES exist if the exists parameter is true conversely false.
865     :param neutron: The neutron client
866     :param name: The expected router name
867     :param exists: Whether or not the network name should exist or not
868     :return: True/False
869     """
870     router = neutron_utils.get_router_by_name(neutron, name)
871     if exists and router:
872         return True
873     return False
874
875
876 def validate_interface_router(interface_router, router, subnet):
877     """
878     Returns true if the router ID & subnet ID have been properly included into
879     the interface router object
880     :param interface_router: the SNAPS-OO InterfaceRouter domain object
881     :param router: to validate against the interface_router
882     :param subnet: to validate against the interface_router
883     :return: True if both IDs match else False
884     """
885     subnet_id = interface_router.subnet_id
886     router_id = interface_router.port_id
887
888     return subnet.get('id') == subnet_id and router.get('id') == router_id
889
890
891 def validate_port(neutron, port_obj, this_port_name):
892     """
893     Returns true if a port for a given name DOES NOT exist if the exists
894     parameter is false conversely true. Returns false if a port for a given
895     name DOES exist if the exists parameter is true conversely false.
896     :param neutron: The neutron client
897     :param port_obj: The port object to lookup
898     :param this_port_name: The expected router name
899     :return: True/False
900     """
901     ports = neutron.list_ports()
902     for port, port_insts in ports.items():
903         for inst in port_insts:
904             if inst['id'] == port_obj.id:
905                 return inst['name'] == this_port_name
906     return False