3 # All rights reserved. This program and the accompanying materials
4 # are made available under the terms of the Apache License, Version 2.0
5 # which accompanies this distribution, and is available at
6 # http://www.apache.org/licenses/LICENSE-2.0
15 from functest.utils import openstack_utils
16 from functest.utils.constants import CONST
19 class OSUtilsTesting(unittest.TestCase):
21 def _get_env_cred_dict(self, os_prefix=''):
22 return {'OS_USERNAME': os_prefix + 'username',
23 'OS_PASSWORD': os_prefix + 'password',
24 'OS_AUTH_URL': os_prefix + 'auth_url',
25 'OS_TENANT_NAME': os_prefix + 'tenant_name',
26 'OS_USER_DOMAIN_NAME': os_prefix + 'user_domain_name',
27 'OS_PROJECT_DOMAIN_NAME': os_prefix + 'project_domain_name',
28 'OS_PROJECT_NAME': os_prefix + 'project_name',
29 'OS_ENDPOINT_TYPE': os_prefix + 'endpoint_type',
30 'OS_REGION_NAME': os_prefix + 'region_name',
31 'OS_CACERT': os_prefix + 'https_cacert',
32 'OS_INSECURE': os_prefix + 'https_insecure'}
34 def _get_os_env_vars(self):
35 return {'username': 'test_username', 'password': 'test_password',
36 'auth_url': 'test_auth_url', 'tenant_name': 'test_tenant_name',
37 'user_domain_name': 'test_user_domain_name',
38 'project_domain_name': 'test_project_domain_name',
39 'project_name': 'test_project_name',
40 'endpoint_type': 'test_endpoint_type',
41 'region_name': 'test_region_name',
42 'https_cacert': 'test_https_cacert',
43 'https_insecure': 'test_https_insecure'}
46 self.env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD']
47 self.tenant_name = 'test_tenant_name'
48 self.env_cred_dict = self._get_env_cred_dict()
49 self.os_environs = self._get_env_cred_dict(os_prefix='test_')
50 self.os_env_vars = self._get_os_env_vars()
52 mock_obj = mock.Mock()
53 attrs = {'name': 'test_flavor',
56 mock_obj.configure_mock(**attrs)
57 self.flavor = mock_obj
59 mock_obj = mock.Mock()
60 attrs = {'name': 'test_aggregate',
62 'hosts': ['host_name']}
63 mock_obj.configure_mock(**attrs)
64 self.aggregate = mock_obj
66 mock_obj = mock.Mock()
67 attrs = {'id': 'instance_id',
68 'name': 'test_instance',
70 mock_obj.configure_mock(**attrs)
71 self.instance = mock_obj
73 mock_obj = mock.Mock()
74 attrs = {'id': 'azone_id',
75 'zoneName': 'test_azone',
77 mock_obj.configure_mock(**attrs)
78 self.availability_zone = mock_obj
80 mock_obj = mock.Mock()
81 attrs = {'floating_network_id': 'floating_id',
82 'floating_ip_address': 'test_floating_ip'}
83 mock_obj.configure_mock(**attrs)
84 self.floating_ip = mock_obj
86 mock_obj = mock.Mock()
87 attrs = {'id': 'hypervisor_id',
88 'hypervisor_hostname': 'test_hostname',
90 mock_obj.configure_mock(**attrs)
91 self.hypervisor = mock_obj
93 mock_obj = mock.Mock()
94 attrs = {'id': 'image_id',
96 mock_obj.configure_mock(**attrs)
99 mock_obj = mock.Mock()
100 self.mock_return = mock_obj
102 self.nova_client = mock.Mock()
103 attrs = {'servers.list.return_value': [self.instance],
104 'servers.get.return_value': self.instance,
105 'servers.find.return_value': self.instance,
106 'servers.create.return_value': self.instance,
107 'flavors.list.return_value': [self.flavor],
108 'flavors.find.return_value': self.flavor,
109 'servers.add_floating_ip.return_value': mock.Mock(),
110 'servers.force_delete.return_value': mock.Mock(),
111 'aggregates.list.return_value': [self.aggregate],
112 'aggregates.add_host.return_value': mock.Mock(),
113 'aggregates.remove_host.return_value': mock.Mock(),
114 'aggregates.get.return_value': self.aggregate,
115 'aggregates.delete.return_value': mock.Mock(),
116 'availability_zones.list.return_value':
117 [self.availability_zone],
118 'hypervisors.list.return_value': [self.hypervisor],
119 'create.return_value': mock.Mock(),
120 'add_security_group.return_value': mock.Mock(),
121 'images.list.return_value': [self.image],
122 'images.delete.return_value': mock.Mock(),
124 self.nova_client.configure_mock(**attrs)
126 self.glance_client = mock.Mock()
127 attrs = {'images.list.return_value': [self.image],
128 'images.create.return_value': self.image,
129 'images.upload.return_value': mock.Mock()}
130 self.glance_client.configure_mock(**attrs)
132 mock_obj = mock.Mock()
133 attrs = {'id': 'volume_id',
134 'name': 'test_volume'}
135 mock_obj.configure_mock(**attrs)
136 self.volume = mock_obj
138 self.cinder_client = mock.Mock()
139 attrs = {'volumes.list.return_value': [self.volume],
140 'quotas.update.return_value': mock.Mock(),
141 'volumes.detach.return_value': mock.Mock(),
142 'volumes.force_delete.return_value': mock.Mock(),
143 'volumes.delete.return_value': mock.Mock()
145 self.cinder_client.configure_mock(**attrs)
147 self.resource = mock.Mock()
148 attrs = {'id': 'resource_test_id',
149 'name': 'resource_test_name'
152 self.heat_client = mock.Mock()
153 attrs = {'resources.get.return_value': self.resource}
154 self.heat_client.configure_mock(**attrs)
156 mock_obj = mock.Mock()
157 attrs = {'id': 'tenant_id',
158 'name': 'test_tenant'}
159 mock_obj.configure_mock(**attrs)
160 self.tenant = mock_obj
162 mock_obj = mock.Mock()
163 attrs = {'id': 'user_id',
165 mock_obj.configure_mock(**attrs)
168 mock_obj = mock.Mock()
169 attrs = {'id': 'role_id',
171 mock_obj.configure_mock(**attrs)
174 mock_obj = mock.Mock()
175 attrs = {'id': 'domain_id',
176 'name': 'test_domain'}
177 mock_obj.configure_mock(**attrs)
178 self.domain = mock_obj
180 self.keystone_client = mock.Mock()
181 attrs = {'projects.list.return_value': [self.tenant],
182 'tenants.list.return_value': [self.tenant],
183 'users.list.return_value': [self.user],
184 'roles.list.return_value': [self.role],
185 'domains.list.return_value': [self.domain],
186 'projects.create.return_value': self.tenant,
187 'tenants.create.return_value': self.tenant,
188 'users.create.return_value': self.user,
189 'roles.grant.return_value': mock.Mock(),
190 'roles.add_user_role.return_value': mock.Mock(),
191 'projects.delete.return_value': mock.Mock(),
192 'tenants.delete.return_value': mock.Mock(),
193 'users.delete.return_value': mock.Mock(),
195 self.keystone_client.configure_mock(**attrs)
197 self.router = {'id': 'router_id',
198 'name': 'test_router'}
200 self.subnet = {'id': 'subnet_id',
201 'name': 'test_subnet'}
203 self.networks = [{'id': 'network_id',
204 'name': 'test_network',
205 'router:external': False,
207 'subnets': [self.subnet]},
208 {'id': 'network_id1',
209 'name': 'test_network1',
210 'router:external': True,
212 'subnets': [self.subnet]}]
214 self.port = {'id': 'port_id',
217 self.sec_group = {'id': 'sec_group_id',
218 'name': 'test_sec_group'}
220 self.sec_group_rule = {'id': 'sec_group_rule_id',
221 'direction': 'direction',
222 'protocol': 'protocol',
223 'port_range_max': 'port_max',
224 'security_group_id': self.sec_group['id'],
225 'port_range_min': 'port_min'}
226 self.neutron_floatingip = {'id': 'fip_id',
227 'floating_ip_address': 'test_ip'}
228 self.neutron_client = mock.Mock()
229 attrs = {'list_networks.return_value': {'networks': self.networks},
230 'list_routers.return_value': {'routers': [self.router]},
231 'list_ports.return_value': {'ports': [self.port]},
232 'list_subnets.return_value': {'subnets': [self.subnet]},
233 'create_network.return_value': {'network': self.networks[0]},
234 'create_subnet.return_value': {'subnets': [self.subnet]},
235 'create_router.return_value': {'router': self.router},
236 'create_port.return_value': {'port': self.port},
237 'create_floatingip.return_value': {'floatingip':
238 self.neutron_floatingip},
239 'update_network.return_value': mock.Mock(),
240 'update_port.return_value': {'port': self.port},
241 'add_interface_router.return_value': mock.Mock(),
242 'add_gateway_router.return_value': mock.Mock(),
243 'delete_network.return_value': mock.Mock(),
244 'delete_subnet.return_value': mock.Mock(),
245 'delete_router.return_value': mock.Mock(),
246 'delete_port.return_value': mock.Mock(),
247 'remove_interface_router.return_value': mock.Mock(),
248 'remove_gateway_router.return_value': mock.Mock(),
249 'list_security_groups.return_value': {'security_groups':
251 'list_security_group_rules.'
252 'return_value': {'security_group_rules':
253 [self.sec_group_rule]},
254 'create_security_group_rule.return_value': mock.Mock(),
255 'create_security_group.return_value': {'security_group':
257 'update_quota.return_value': mock.Mock(),
258 'delete_security_group.return_value': mock.Mock(),
259 'list_floatingips.return_value': {'floatingips':
261 'delete_floatingip.return_value': mock.Mock(),
263 self.neutron_client.configure_mock(**attrs)
265 self.empty_client = mock.Mock()
266 attrs = {'list_networks.return_value': {'networks': []},
267 'list_routers.return_value': {'routers': []},
268 'list_ports.return_value': {'ports': []},
269 'list_subnets.return_value': {'subnets': []}}
270 self.empty_client.configure_mock(**attrs)
272 @mock.patch('functest.utils.openstack_utils.os.getenv',
274 def test_is_keystone_v3_missing_identity(self, mock_os_getenv):
275 self.assertEqual(openstack_utils.is_keystone_v3(), False)
277 @mock.patch('functest.utils.openstack_utils.os.getenv',
279 def test_is_keystone_v3_default(self, mock_os_getenv):
280 self.assertEqual(openstack_utils.is_keystone_v3(), True)
282 @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
284 def test_get_rc_env_vars_missing_identity(self, mock_get_rc_env):
285 exp_resp = self.env_vars
286 exp_resp.extend(['OS_TENANT_NAME'])
287 self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
289 @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
291 def test_get_rc_env_vars_default(self, mock_get_rc_env):
292 exp_resp = self.env_vars
293 exp_resp.extend(['OS_PROJECT_NAME',
294 'OS_USER_DOMAIN_NAME',
295 'OS_PROJECT_DOMAIN_NAME'])
296 self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
298 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
299 def test_check_credentials_missing_env(self, mock_get_rc_env):
300 exp_resp = self.env_vars
301 exp_resp.extend(['OS_TENANT_NAME'])
302 mock_get_rc_env.return_value = exp_resp
303 with mock.patch.dict('functest.utils.openstack_utils.os.environ', {},
305 self.assertEqual(openstack_utils.check_credentials(), False)
307 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
308 def test_check_credentials_default(self, mock_get_rc_env):
309 exp_resp = ['OS_TENANT_NAME']
310 mock_get_rc_env.return_value = exp_resp
311 with mock.patch.dict('functest.utils.openstack_utils.os.environ',
312 {'OS_TENANT_NAME': self.tenant_name},
314 self.assertEqual(openstack_utils.check_credentials(), True)
316 def test_get_env_cred_dict(self):
317 self.assertDictEqual(openstack_utils.get_env_cred_dict(),
320 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
321 def test_get_credentials_default(self, mock_get_rc_env):
322 mock_get_rc_env.return_value = self.env_cred_dict.keys()
323 with mock.patch.dict('functest.utils.openstack_utils.os.environ',
326 self.assertDictEqual(openstack_utils.get_credentials(),
329 def _get_credentials_missing_env(self, var):
330 dic = copy.deepcopy(self.os_environs)
332 with mock.patch('functest.utils.openstack_utils.get_rc_env_vars',
333 return_value=self.env_cred_dict.keys()), \
334 mock.patch.dict('functest.utils.openstack_utils.os.environ',
337 self.assertRaises(openstack_utils.MissingEnvVar,
338 lambda: openstack_utils.get_credentials())
340 def test_get_credentials_missing_username(self):
341 self._get_credentials_missing_env('OS_USERNAME')
343 def test_get_credentials_missing_password(self):
344 self._get_credentials_missing_env('OS_PASSWORD')
346 def test_get_credentials_missing_auth_url(self):
347 self._get_credentials_missing_env('OS_AUTH_URL')
349 def test_get_credentials_missing_tenantname(self):
350 self._get_credentials_missing_env('OS_TENANT_NAME')
352 def test_get_credentials_missing_domainname(self):
353 self._get_credentials_missing_env('OS_USER_DOMAIN_NAME')
355 def test_get_credentials_missing_projectname(self):
356 self._get_credentials_missing_env('OS_PROJECT_NAME')
358 def test_get_credentials_missing_endpoint_type(self):
359 self._get_credentials_missing_env('OS_ENDPOINT_TYPE')
361 def _test_source_credentials(self, msg, key='OS_TENANT_NAME',
368 with mock.patch('six.moves.builtins.open',
369 mock.mock_open(read_data=msg),
371 m.return_value.__iter__ = lambda self: iter(self.readline, '')
372 openstack_utils.source_credentials(f)
373 m.assert_called_once_with(f, 'r')
374 self.assertEqual(os.environ[key], value)
376 def test_source_credentials(self):
377 self._test_source_credentials('OS_TENANT_NAME=admin')
378 self._test_source_credentials('OS_TENANT_NAME= admin')
379 self._test_source_credentials('OS_TENANT_NAME = admin')
380 self._test_source_credentials('OS_TENANT_NAME = "admin"')
381 self._test_source_credentials('export OS_TENANT_NAME=admin')
382 self._test_source_credentials('export OS_TENANT_NAME =admin')
383 self._test_source_credentials('export OS_TENANT_NAME = admin')
384 self._test_source_credentials('export OS_TENANT_NAME = "admin"')
385 # This test will fail as soon as rc_file is fixed
386 self._test_source_credentials(
387 'export "\'OS_TENANT_NAME\'" = "\'admin\'"')
389 @mock.patch('functest.utils.openstack_utils.os.getenv',
391 def test_get_keystone_client_version_missing_env(self, mock_os_getenv):
392 self.assertEqual(openstack_utils.get_keystone_client_version(),
393 openstack_utils.DEFAULT_API_VERSION)
395 @mock.patch('functest.utils.openstack_utils.logger.info')
396 @mock.patch('functest.utils.openstack_utils.os.getenv',
398 def test_get_keystone_client_version_default(self, mock_os_getenv,
400 self.assertEqual(openstack_utils.get_keystone_client_version(),
402 mock_logger_info.assert_called_once_with("OS_IDENTITY_API_VERSION is "
403 "set in env as '%s'", '3')
405 @mock.patch('functest.utils.openstack_utils.get_session')
406 @mock.patch('functest.utils.openstack_utils.keystoneclient.Client')
407 @mock.patch('functest.utils.openstack_utils.get_keystone_client_version',
409 @mock.patch('functest.utils.openstack_utils.os.getenv',
410 return_value='public')
411 def test_get_keystone_client_with_interface(self, mock_os_getenv,
412 mock_keystoneclient_version,
415 mock_keystone_obj = mock.Mock()
416 mock_session_obj = mock.Mock()
417 mock_key_client.return_value = mock_keystone_obj
418 mock_get_session.return_value = mock_session_obj
419 self.assertEqual(openstack_utils.get_keystone_client(),
421 mock_key_client.assert_called_once_with('3',
422 session=mock_session_obj,
425 @mock.patch('functest.utils.openstack_utils.get_session')
426 @mock.patch('functest.utils.openstack_utils.keystoneclient.Client')
427 @mock.patch('functest.utils.openstack_utils.get_keystone_client_version',
429 @mock.patch('functest.utils.openstack_utils.os.getenv',
430 return_value='admin')
431 def test_get_keystone_client_no_interface(self, mock_os_getenv,
432 mock_keystoneclient_version,
435 mock_keystone_obj = mock.Mock()
436 mock_session_obj = mock.Mock()
437 mock_key_client.return_value = mock_keystone_obj
438 mock_get_session.return_value = mock_session_obj
439 self.assertEqual(openstack_utils.get_keystone_client(),
441 mock_key_client.assert_called_once_with('3',
442 session=mock_session_obj,
445 @mock.patch('functest.utils.openstack_utils.os.getenv',
447 def test_get_nova_client_version_missing_env(self, mock_os_getenv):
448 self.assertEqual(openstack_utils.get_nova_client_version(),
449 openstack_utils.DEFAULT_API_VERSION)
451 @mock.patch('functest.utils.openstack_utils.logger.info')
452 @mock.patch('functest.utils.openstack_utils.os.getenv',
454 def test_get_nova_client_version_default(self, mock_os_getenv,
456 self.assertEqual(openstack_utils.get_nova_client_version(),
458 mock_logger_info.assert_called_once_with("OS_COMPUTE_API_VERSION is "
459 "set in env as '%s'", '3')
461 def test_get_nova_client(self):
462 mock_nova_obj = mock.Mock()
463 mock_session_obj = mock.Mock()
464 with mock.patch('functest.utils.openstack_utils'
465 '.get_nova_client_version', return_value='3'), \
466 mock.patch('functest.utils.openstack_utils'
467 '.novaclient.Client',
468 return_value=mock_nova_obj) \
469 as mock_nova_client, \
470 mock.patch('functest.utils.openstack_utils.get_session',
471 return_value=mock_session_obj):
472 self.assertEqual(openstack_utils.get_nova_client(),
474 mock_nova_client.assert_called_once_with('3',
475 session=mock_session_obj)
477 @mock.patch('functest.utils.openstack_utils.os.getenv',
479 def test_get_cinder_client_version_missing_env(self, mock_os_getenv):
480 self.assertEqual(openstack_utils.get_cinder_client_version(),
481 openstack_utils.DEFAULT_API_VERSION)
483 @mock.patch('functest.utils.openstack_utils.logger.info')
484 @mock.patch('functest.utils.openstack_utils.os.getenv',
486 def test_get_cinder_client_version_default(self, mock_os_getenv,
488 self.assertEqual(openstack_utils.get_cinder_client_version(),
490 mock_logger_info.assert_called_once_with("OS_VOLUME_API_VERSION is "
491 "set in env as '%s'", '3')
493 def test_get_cinder_client(self):
494 mock_cinder_obj = mock.Mock()
495 mock_session_obj = mock.Mock()
496 with mock.patch('functest.utils.openstack_utils'
497 '.get_cinder_client_version', return_value='3'), \
498 mock.patch('functest.utils.openstack_utils'
499 '.cinderclient.Client',
500 return_value=mock_cinder_obj) \
501 as mock_cind_client, \
502 mock.patch('functest.utils.openstack_utils.get_session',
503 return_value=mock_session_obj):
504 self.assertEqual(openstack_utils.get_cinder_client(),
506 mock_cind_client.assert_called_once_with('3',
507 session=mock_session_obj)
509 @mock.patch('functest.utils.openstack_utils.os.getenv',
511 def test_get_neutron_client_version_missing_env(self, mock_os_getenv):
512 self.assertEqual(openstack_utils.get_neutron_client_version(),
513 openstack_utils.DEFAULT_API_VERSION)
515 @mock.patch('functest.utils.openstack_utils.logger.info')
516 @mock.patch('functest.utils.openstack_utils.os.getenv',
518 def test_get_neutron_client_version_default(self, mock_os_getenv,
520 self.assertEqual(openstack_utils.get_neutron_client_version(),
522 mock_logger_info.assert_called_once_with("OS_NETWORK_API_VERSION is "
523 "set in env as '%s'", '3')
525 def test_get_neutron_client(self):
526 mock_neutron_obj = mock.Mock()
527 mock_session_obj = mock.Mock()
528 with mock.patch('functest.utils.openstack_utils'
529 '.get_neutron_client_version', return_value='3'), \
530 mock.patch('functest.utils.openstack_utils'
531 '.neutronclient.Client',
532 return_value=mock_neutron_obj) \
533 as mock_neut_client, \
534 mock.patch('functest.utils.openstack_utils.get_session',
535 return_value=mock_session_obj):
536 self.assertEqual(openstack_utils.get_neutron_client(),
538 mock_neut_client.assert_called_once_with('3',
539 session=mock_session_obj)
541 @mock.patch('functest.utils.openstack_utils.os.getenv',
543 def test_get_glance_client_version_missing_env(self, mock_os_getenv):
544 self.assertEqual(openstack_utils.get_glance_client_version(),
545 openstack_utils.DEFAULT_API_VERSION)
547 @mock.patch('functest.utils.openstack_utils.logger.info')
548 @mock.patch('functest.utils.openstack_utils.os.getenv',
550 def test_get_glance_client_version_default(self, mock_os_getenv,
552 self.assertEqual(openstack_utils.get_glance_client_version(),
554 mock_logger_info.assert_called_once_with("OS_IMAGE_API_VERSION is "
555 "set in env as '%s'", '3')
557 def test_get_glance_client(self):
558 mock_glance_obj = mock.Mock()
559 mock_session_obj = mock.Mock()
560 with mock.patch('functest.utils.openstack_utils'
561 '.get_glance_client_version', return_value='3'), \
562 mock.patch('functest.utils.openstack_utils'
563 '.glanceclient.Client',
564 return_value=mock_glance_obj) \
565 as mock_glan_client, \
566 mock.patch('functest.utils.openstack_utils.get_session',
567 return_value=mock_session_obj):
568 self.assertEqual(openstack_utils.get_glance_client(),
570 mock_glan_client.assert_called_once_with('3',
571 session=mock_session_obj)
573 @mock.patch('functest.utils.openstack_utils.os.getenv',
575 def test_get_heat_client_version_missing_env(self, mock_os_getenv):
576 self.assertEqual(openstack_utils.get_heat_client_version(),
577 openstack_utils.DEFAULT_HEAT_API_VERSION)
579 @mock.patch('functest.utils.openstack_utils.logger.info')
580 @mock.patch('functest.utils.openstack_utils.os.getenv', return_value='1')
581 def test_get_heat_client_version_default(self, mock_os_getenv,
583 self.assertEqual(openstack_utils.get_heat_client_version(), '1')
584 mock_logger_info.assert_called_once_with(
585 "OS_ORCHESTRATION_API_VERSION is set in env as '%s'", '1')
587 def test_get_heat_client(self):
588 mock_heat_obj = mock.Mock()
589 mock_session_obj = mock.Mock()
590 with mock.patch('functest.utils.openstack_utils'
591 '.get_heat_client_version', return_value='1'), \
592 mock.patch('functest.utils.openstack_utils'
593 '.heatclient.Client',
594 return_value=mock_heat_obj) \
595 as mock_heat_client, \
596 mock.patch('functest.utils.openstack_utils.get_session',
597 return_value=mock_session_obj):
598 self.assertEqual(openstack_utils.get_heat_client(),
600 mock_heat_client.assert_called_once_with('1',
601 session=mock_session_obj)
603 def test_get_instances_default(self):
604 self.assertEqual(openstack_utils.get_instances(self.nova_client),
607 @mock.patch('functest.utils.openstack_utils.logger.error')
608 def test_get_instances_exception(self, mock_logger_error):
609 self.assertEqual(openstack_utils.
610 get_instances(Exception),
612 self.assertTrue(mock_logger_error.called)
614 def test_get_instance_status_default(self):
615 self.assertEqual(openstack_utils.get_instance_status(self.nova_client,
619 @mock.patch('functest.utils.openstack_utils.logger.error')
620 def test_get_instance_status_exception(self, mock_logger_error):
621 self.assertEqual(openstack_utils.
622 get_instance_status(Exception,
625 self.assertTrue(mock_logger_error.called)
627 def test_get_instance_by_name_default(self):
628 self.assertEqual(openstack_utils.
629 get_instance_by_name(self.nova_client,
633 @mock.patch('functest.utils.openstack_utils.logger.error')
634 def test_get_instance_by_name_exception(self, mock_logger_error):
635 self.assertEqual(openstack_utils.
636 get_instance_by_name(Exception,
639 self.assertTrue(mock_logger_error.called)
641 def test_get_flavor_id_default(self):
642 self.assertEqual(openstack_utils.
643 get_flavor_id(self.nova_client,
647 def test_get_flavor_id_by_ram_range_default(self):
648 self.assertEqual(openstack_utils.
649 get_flavor_id_by_ram_range(self.nova_client,
653 def test_get_aggregates_default(self):
654 self.assertEqual(openstack_utils.
655 get_aggregates(self.nova_client),
658 @mock.patch('functest.utils.openstack_utils.logger.error')
659 def test_get_aggregates_exception(self, mock_logger_error):
660 self.assertEqual(openstack_utils.
661 get_aggregates(Exception),
663 self.assertTrue(mock_logger_error.called)
665 def test_get_aggregate_id_default(self):
666 with mock.patch('functest.utils.openstack_utils.get_aggregates',
667 return_value=[self.aggregate]):
668 self.assertEqual(openstack_utils.
669 get_aggregate_id(self.nova_client,
673 @mock.patch('functest.utils.openstack_utils.logger.error')
674 def test_get_aggregate_id_exception(self, mock_logger_error):
675 self.assertEqual(openstack_utils.
676 get_aggregate_id(Exception,
679 self.assertTrue(mock_logger_error.called)
681 def test_get_availability_zone_names_default(self):
682 with mock.patch('functest.utils.openstack_utils'
683 '.get_availability_zones',
684 return_value=[self.availability_zone]):
685 self.assertEqual(openstack_utils.
686 get_availability_zone_names(self.nova_client),
689 @mock.patch('functest.utils.openstack_utils.logger.error')
690 def test_get_availability_zone_names_exception(self, mock_logger_error):
691 self.assertEqual(openstack_utils.
692 get_availability_zone_names(Exception),
694 self.assertTrue(mock_logger_error.called)
696 def test_get_availability_zones_default(self):
697 self.assertEqual(openstack_utils.
698 get_availability_zones(self.nova_client),
699 [self.availability_zone])
701 @mock.patch('functest.utils.openstack_utils.logger.error')
702 def test_get_availability_zones_exception(self, mock_logger_error):
703 self.assertEqual(openstack_utils.
704 get_availability_zones(Exception),
706 self.assertTrue(mock_logger_error.called)
708 def test_get_floating_ips_default(self):
709 self.assertEqual(openstack_utils.
710 get_floating_ips(self.neutron_client),
713 @mock.patch('functest.utils.openstack_utils.logger.error')
714 def test_get_floating_ips_exception(self, mock_logger_error):
715 self.assertEqual(openstack_utils.
716 get_floating_ips(Exception),
718 self.assertTrue(mock_logger_error.called)
720 def test_get_hypervisors_default(self):
721 self.assertEqual(openstack_utils.
722 get_hypervisors(self.nova_client),
725 @mock.patch('functest.utils.openstack_utils.logger.error')
726 def test_get_hypervisors_exception(self, mock_logger_error):
727 self.assertEqual(openstack_utils.
728 get_hypervisors(Exception),
730 self.assertTrue(mock_logger_error.called)
732 def test_create_aggregate_default(self):
733 self.assertTrue(openstack_utils.
734 create_aggregate(self.nova_client,
738 @mock.patch('functest.utils.openstack_utils.logger.error')
739 def test_create_aggregate_exception(self, mock_logger_error):
740 self.assertEqual(openstack_utils.
741 create_aggregate(Exception,
745 self.assertTrue(mock_logger_error.called)
747 def test_add_host_to_aggregate_default(self):
748 with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
749 self.assertTrue(openstack_utils.
750 add_host_to_aggregate(self.nova_client,
754 @mock.patch('functest.utils.openstack_utils.logger.error')
755 def test_add_host_to_aggregate_exception(self, mock_logger_error):
756 self.assertEqual(openstack_utils.
757 add_host_to_aggregate(Exception,
761 self.assertTrue(mock_logger_error.called)
763 def test_create_aggregate_with_host_default(self):
764 with mock.patch('functest.utils.openstack_utils.create_aggregate'), \
765 mock.patch('functest.utils.openstack_utils.'
766 'add_host_to_aggregate'):
767 self.assertTrue(openstack_utils.
768 create_aggregate_with_host(self.nova_client,
773 @mock.patch('functest.utils.openstack_utils.logger.error')
774 def test_create_aggregate_with_host_exception(self, mock_logger_error):
775 with mock.patch('functest.utils.openstack_utils.create_aggregate',
776 side_effect=Exception):
777 self.assertEqual(openstack_utils.
778 create_aggregate_with_host(Exception,
783 self.assertTrue(mock_logger_error.called)
785 def test_create_instance_default(self):
786 with mock.patch('functest.utils.openstack_utils.'
788 return_value=self.nova_client):
789 self.assertEqual(openstack_utils.
790 create_instance('test_flavor',
795 @mock.patch('functest.utils.openstack_utils.logger.error')
796 def test_create_instance_exception(self, mock_logger_error):
797 with mock.patch('functest.utils.openstack_utils.'
799 return_value=self.nova_client):
800 self.nova_client.flavors.find.side_effect = Exception
801 self.assertEqual(openstack_utils.
802 create_instance('test_flavor',
806 self.assertTrue(mock_logger_error)
808 def test_create_floating_ip_default(self):
809 with mock.patch('functest.utils.openstack_utils.'
810 'get_external_net_id',
811 return_value='external_net_id'):
812 exp_resp = {'fip_addr': 'test_ip', 'fip_id': 'fip_id'}
813 self.assertEqual(openstack_utils.
814 create_floating_ip(self.neutron_client),
817 @mock.patch('functest.utils.openstack_utils.logger.error')
818 def test_create_floating_ip_exception(self, mock_logger_error):
819 with mock.patch('functest.utils.openstack_utils.'
820 'get_external_net_id',
821 return_value='external_net_id'):
822 self.assertEqual(openstack_utils.
823 create_floating_ip(Exception),
825 self.assertTrue(mock_logger_error)
827 def test_add_floating_ip_default(self):
828 with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
829 self.assertTrue(openstack_utils.
830 add_floating_ip(self.nova_client,
832 'test_floatingip_addr'))
834 @mock.patch('functest.utils.openstack_utils.logger.error')
835 def test_add_floating_ip_exception(self, mock_logger_error):
836 self.assertFalse(openstack_utils.
837 add_floating_ip(Exception,
839 'test_floatingip_addr'))
840 self.assertTrue(mock_logger_error.called)
842 def test_delete_instance_default(self):
843 self.assertTrue(openstack_utils.
844 delete_instance(self.nova_client,
847 @mock.patch('functest.utils.openstack_utils.logger.error')
848 def test_delete_instance_exception(self, mock_logger_error):
849 self.assertFalse(openstack_utils.
850 delete_instance(Exception,
852 self.assertTrue(mock_logger_error.called)
854 def test_delete_floating_ip_default(self):
855 self.assertTrue(openstack_utils.
856 delete_floating_ip(self.neutron_client,
859 @mock.patch('functest.utils.openstack_utils.logger.error')
860 def test_delete_floating_ip_exception(self, mock_logger_error):
861 self.assertFalse(openstack_utils.
862 delete_floating_ip(Exception,
864 self.assertTrue(mock_logger_error.called)
866 def test_remove_host_from_aggregate_default(self):
867 with mock.patch('functest.utils.openstack_utils.'
869 self.assertTrue(openstack_utils.
870 remove_host_from_aggregate(self.nova_client,
874 @mock.patch('functest.utils.openstack_utils.logger.error')
875 def test_remove_host_from_aggregate_exception(self, mock_logger_error):
876 with mock.patch('functest.utils.openstack_utils.'
877 'get_aggregate_id', side_effect=Exception):
878 self.assertFalse(openstack_utils.
879 remove_host_from_aggregate(self.nova_client,
882 self.assertTrue(mock_logger_error.called)
884 def test_remove_hosts_from_aggregate_default(self):
885 with mock.patch('functest.utils.openstack_utils.'
886 'get_aggregate_id'), \
887 mock.patch('functest.utils.openstack_utils.'
888 'remove_host_from_aggregate',
891 openstack_utils.remove_hosts_from_aggregate(self.nova_client,
893 mock_method.assert_any_call(self.nova_client,
897 def test_delete_aggregate_default(self):
898 with mock.patch('functest.utils.openstack_utils.'
899 'remove_hosts_from_aggregate'):
900 self.assertTrue(openstack_utils.
901 delete_aggregate(self.nova_client,
904 @mock.patch('functest.utils.openstack_utils.logger.error')
905 def test_delete_aggregate_exception(self, mock_logger_error):
906 with mock.patch('functest.utils.openstack_utils.'
907 'remove_hosts_from_aggregate', side_effect=Exception):
908 self.assertFalse(openstack_utils.
909 delete_aggregate(self.nova_client,
911 self.assertTrue(mock_logger_error.called)
913 def test_get_network_list_default(self):
914 self.assertEqual(openstack_utils.
915 get_network_list(self.neutron_client),
918 def test_get_network_list_missing_network(self):
919 self.assertEqual(openstack_utils.
920 get_network_list(self.empty_client),
923 def test_get_router_list_default(self):
924 self.assertEqual(openstack_utils.
925 get_router_list(self.neutron_client),
928 def test_get_router_list_missing_router(self):
929 self.assertEqual(openstack_utils.
930 get_router_list(self.empty_client),
933 def test_get_port_list_default(self):
934 self.assertEqual(openstack_utils.
935 get_port_list(self.neutron_client),
938 def test_get_port_list_missing_port(self):
939 self.assertEqual(openstack_utils.
940 get_port_list(self.empty_client),
943 def test_get_network_id_default(self):
944 self.assertEqual(openstack_utils.
945 get_network_id(self.neutron_client,
949 def test_get_subnet_id_default(self):
950 self.assertEqual(openstack_utils.
951 get_subnet_id(self.neutron_client,
955 def test_get_router_id_default(self):
956 self.assertEqual(openstack_utils.
957 get_router_id(self.neutron_client,
961 def test_get_private_net_default(self):
962 self.assertEqual(openstack_utils.
963 get_private_net(self.neutron_client),
966 def test_get_private_net_missing_net(self):
967 self.assertEqual(openstack_utils.
968 get_private_net(self.empty_client),
971 def test_get_external_net_default(self):
972 self.assertEqual(openstack_utils.
973 get_external_net(self.neutron_client),
976 def test_get_external_net_missing_net(self):
977 self.assertEqual(openstack_utils.
978 get_external_net(self.empty_client),
981 def test_get_external_net_id_default(self):
982 self.assertEqual(openstack_utils.
983 get_external_net_id(self.neutron_client),
986 def test_get_external_net_id_missing_net(self):
987 self.assertEqual(openstack_utils.
988 get_external_net_id(self.empty_client),
991 def test_check_neutron_net_default(self):
992 self.assertTrue(openstack_utils.
993 check_neutron_net(self.neutron_client,
996 def test_check_neutron_net_missing_net(self):
997 self.assertFalse(openstack_utils.
998 check_neutron_net(self.empty_client,
1001 def test_create_neutron_net_default(self):
1002 self.assertEqual(openstack_utils.
1003 create_neutron_net(self.neutron_client,
1007 @mock.patch('functest.utils.openstack_utils.logger.error')
1008 def test_create_neutron_net_exception(self, mock_logger_error):
1009 self.assertEqual(openstack_utils.
1010 create_neutron_net(Exception,
1013 self.assertTrue(mock_logger_error.called)
1015 def test_create_neutron_subnet_default(self):
1016 self.assertEqual(openstack_utils.
1017 create_neutron_subnet(self.neutron_client,
1023 @mock.patch('functest.utils.openstack_utils.logger.error')
1024 def test_create_neutron_subnet_exception(self, mock_logger_error):
1025 self.assertEqual(openstack_utils.
1026 create_neutron_subnet(Exception,
1031 self.assertTrue(mock_logger_error.called)
1033 def test_create_neutron_router_default(self):
1034 self.assertEqual(openstack_utils.
1035 create_neutron_router(self.neutron_client,
1039 @mock.patch('functest.utils.openstack_utils.logger.error')
1040 def test_create_neutron_router_exception(self, mock_logger_error):
1041 self.assertEqual(openstack_utils.
1042 create_neutron_router(Exception,
1045 self.assertTrue(mock_logger_error.called)
1047 def test_create_neutron_port_default(self):
1048 self.assertEqual(openstack_utils.
1049 create_neutron_port(self.neutron_client,
1055 @mock.patch('functest.utils.openstack_utils.logger.error')
1056 def test_create_neutron_port_exception(self, mock_logger_error):
1057 self.assertEqual(openstack_utils.
1058 create_neutron_port(Exception,
1063 self.assertTrue(mock_logger_error.called)
1065 def test_update_neutron_net_default(self):
1066 self.assertTrue(openstack_utils.
1067 update_neutron_net(self.neutron_client,
1070 @mock.patch('functest.utils.openstack_utils.logger.error')
1071 def test_update_neutron_net_exception(self, mock_logger_error):
1072 self.assertFalse(openstack_utils.
1073 update_neutron_net(Exception,
1075 self.assertTrue(mock_logger_error.called)
1077 def test_update_neutron_port_default(self):
1078 self.assertEqual(openstack_utils.
1079 update_neutron_port(self.neutron_client,
1084 @mock.patch('functest.utils.openstack_utils.logger.error')
1085 def test_update_neutron_port_exception(self, mock_logger_error):
1086 self.assertEqual(openstack_utils.
1087 update_neutron_port(Exception,
1091 self.assertTrue(mock_logger_error.called)
1093 def test_add_interface_router_default(self):
1094 self.assertTrue(openstack_utils.
1095 add_interface_router(self.neutron_client,
1099 @mock.patch('functest.utils.openstack_utils.logger.error')
1100 def test_add_interface_router_exception(self, mock_logger_error):
1101 self.assertFalse(openstack_utils.
1102 add_interface_router(Exception,
1105 self.assertTrue(mock_logger_error.called)
1107 def test_add_gateway_router_default(self):
1108 with mock.patch('functest.utils.openstack_utils.'
1109 'get_external_net_id',
1110 return_value='network_id'):
1111 self.assertTrue(openstack_utils.
1112 add_gateway_router(self.neutron_client,
1115 @mock.patch('functest.utils.openstack_utils.logger.error')
1116 def test_add_gateway_router_exception(self, mock_logger_error):
1117 with mock.patch('functest.utils.openstack_utils.'
1118 'get_external_net_id',
1119 return_value='network_id'):
1120 self.assertFalse(openstack_utils.
1121 add_gateway_router(Exception,
1123 self.assertTrue(mock_logger_error.called)
1125 def test_delete_neutron_net_default(self):
1126 self.assertTrue(openstack_utils.
1127 delete_neutron_net(self.neutron_client,
1130 @mock.patch('functest.utils.openstack_utils.logger.error')
1131 def test_delete_neutron_net_exception(self, mock_logger_error):
1132 self.assertFalse(openstack_utils.
1133 delete_neutron_net(Exception,
1135 self.assertTrue(mock_logger_error.called)
1137 def test_delete_neutron_subnet_default(self):
1138 self.assertTrue(openstack_utils.
1139 delete_neutron_subnet(self.neutron_client,
1142 @mock.patch('functest.utils.openstack_utils.logger.error')
1143 def test_delete_neutron_subnet_exception(self, mock_logger_error):
1144 self.assertFalse(openstack_utils.
1145 delete_neutron_subnet(Exception,
1147 self.assertTrue(mock_logger_error.called)
1149 def test_delete_neutron_router_default(self):
1150 self.assertTrue(openstack_utils.
1151 delete_neutron_router(self.neutron_client,
1154 @mock.patch('functest.utils.openstack_utils.logger.error')
1155 def test_delete_neutron_router_exception(self, mock_logger_error):
1156 self.assertFalse(openstack_utils.
1157 delete_neutron_router(Exception,
1159 self.assertTrue(mock_logger_error.called)
1161 def test_delete_neutron_port_default(self):
1162 self.assertTrue(openstack_utils.
1163 delete_neutron_port(self.neutron_client,
1166 @mock.patch('functest.utils.openstack_utils.logger.error')
1167 def test_delete_neutron_port_exception(self, mock_logger_error):
1168 self.assertFalse(openstack_utils.
1169 delete_neutron_port(Exception,
1171 self.assertTrue(mock_logger_error.called)
1173 def test_remove_interface_router_default(self):
1174 self.assertTrue(openstack_utils.
1175 remove_interface_router(self.neutron_client,
1179 @mock.patch('functest.utils.openstack_utils.logger.error')
1180 def test_remove_interface_router_exception(self, mock_logger_error):
1181 self.assertFalse(openstack_utils.
1182 remove_interface_router(Exception,
1185 self.assertTrue(mock_logger_error.called)
1187 def test_remove_gateway_router_default(self):
1188 self.assertTrue(openstack_utils.
1189 remove_gateway_router(self.neutron_client,
1192 @mock.patch('functest.utils.openstack_utils.logger.error')
1193 def test_remove_gateway_router_exception(self, mock_logger_error):
1194 self.assertFalse(openstack_utils.
1195 remove_gateway_router(Exception,
1197 self.assertTrue(mock_logger_error.called)
1199 def test_get_security_groups_default(self):
1200 self.assertEqual(openstack_utils.
1201 get_security_groups(self.neutron_client),
1204 @mock.patch('functest.utils.openstack_utils.logger.error')
1205 def test_get_security_groups_exception(self, mock_logger_error):
1206 self.assertEqual(openstack_utils.
1207 get_security_groups(Exception),
1209 self.assertTrue(mock_logger_error.called)
1211 def test_get_security_group_id_default(self):
1212 with mock.patch('functest.utils.openstack_utils.'
1213 'get_security_groups',
1214 return_value=[self.sec_group]):
1215 self.assertEqual(openstack_utils.
1216 get_security_group_id(self.neutron_client,
1220 def test_get_security_group_rules_default(self):
1221 self.assertEqual(openstack_utils.
1222 get_security_group_rules(self.neutron_client,
1223 self.sec_group['id']),
1224 [self.sec_group_rule])
1226 @mock.patch('functest.utils.openstack_utils.logger.error')
1227 def test_get_security_group_rules_exception(self, mock_logger_error):
1228 self.assertEqual(openstack_utils.
1229 get_security_group_rules(Exception,
1232 self.assertTrue(mock_logger_error.called)
1234 def test_check_security_group_rules_not_exists(self):
1235 self.assertEqual(openstack_utils.
1236 check_security_group_rules(self.neutron_client,
1244 def test_check_security_group_rules_exists(self):
1245 self.assertEqual(openstack_utils.
1246 check_security_group_rules(self.neutron_client,
1247 self.sec_group['id'],
1254 @mock.patch('functest.utils.openstack_utils.logger.error')
1255 def test_check_security_group_rules_exception(self, mock_logger_error):
1256 self.assertEqual(openstack_utils.
1257 check_security_group_rules(Exception,
1264 self.assertTrue(mock_logger_error.called)
1266 def test_create_security_group_default(self):
1267 self.assertEqual(openstack_utils.
1268 create_security_group(self.neutron_client,
1273 @mock.patch('functest.utils.openstack_utils.logger.error')
1274 def test_create_security_group_exception(self, mock_logger_error):
1275 self.assertEqual(openstack_utils.
1276 create_security_group(Exception,
1280 self.assertTrue(mock_logger_error.called)
1282 def test_create_secgroup_rule_default(self):
1283 self.assertTrue(openstack_utils.
1284 create_secgroup_rule(self.neutron_client,
1290 self.assertTrue(openstack_utils.
1291 create_secgroup_rule(self.neutron_client,
1296 @mock.patch('functest.utils.openstack_utils.logger.error')
1297 def test_create_secgroup_rule_invalid_port_range(self, mock_logger_error):
1298 self.assertFalse(openstack_utils.
1299 create_secgroup_rule(self.neutron_client,
1305 @mock.patch('functest.utils.openstack_utils.logger.error')
1306 def test_create_secgroup_rule_exception(self, mock_logger_error):
1307 self.assertFalse(openstack_utils.
1308 create_secgroup_rule(Exception,
1313 @mock.patch('functest.utils.openstack_utils.logger.info')
1314 def test_create_security_group_full_default(self, mock_logger_info):
1315 with mock.patch('functest.utils.openstack_utils.'
1316 'get_security_group_id',
1317 return_value='sg_id'):
1318 self.assertEqual(openstack_utils.
1319 create_security_group_full(self.neutron_client,
1323 self.assertTrue(mock_logger_info)
1325 @mock.patch('functest.utils.openstack_utils.logger.info')
1326 @mock.patch('functest.utils.openstack_utils.logger.error')
1327 def test_create_security_group_full_sec_group_fail(self,
1330 with mock.patch('functest.utils.openstack_utils.'
1331 'get_security_group_id',
1333 mock.patch('functest.utils.openstack_utils.'
1334 'create_security_group',
1335 return_value=False):
1336 self.assertEqual(openstack_utils.
1337 create_security_group_full(self.neutron_client,
1341 self.assertTrue(mock_logger_error)
1342 self.assertTrue(mock_logger_info)
1344 @mock.patch('functest.utils.openstack_utils.logger.debug')
1345 @mock.patch('functest.utils.openstack_utils.logger.info')
1346 @mock.patch('functest.utils.openstack_utils.logger.error')
1347 def test_create_security_group_full_secgroup_rule_fail(self,
1351 with mock.patch('functest.utils.openstack_utils.'
1352 'get_security_group_id',
1354 mock.patch('functest.utils.openstack_utils.'
1355 'create_security_group',
1356 return_value={'id': 'sg_id',
1357 'name': 'sg_name'}), \
1358 mock.patch('functest.utils.openstack_utils.'
1359 'create_secgroup_rule',
1360 return_value=False):
1361 self.assertEqual(openstack_utils.
1362 create_security_group_full(self.neutron_client,
1366 self.assertTrue(mock_logger_error)
1367 self.assertTrue(mock_logger_info)
1368 self.assertTrue(mock_logger_debug)
1370 def test_add_secgroup_to_instance_default(self):
1371 self.assertTrue(openstack_utils.
1372 add_secgroup_to_instance(self.nova_client,
1376 @mock.patch('functest.utils.openstack_utils.logger.error')
1377 def test_add_secgroup_to_instance_exception(self, mock_logger_error):
1378 self.assertFalse(openstack_utils.
1379 add_secgroup_to_instance(Exception,
1382 self.assertTrue(mock_logger_error.called)
1384 def test_update_sg_quota_default(self):
1385 self.assertTrue(openstack_utils.
1386 update_sg_quota(self.neutron_client,
1391 @mock.patch('functest.utils.openstack_utils.logger.error')
1392 def test_update_sg_quota_exception(self, mock_logger_error):
1393 self.assertFalse(openstack_utils.
1394 update_sg_quota(Exception,
1398 self.assertTrue(mock_logger_error.called)
1400 def test_delete_security_group_default(self):
1401 self.assertTrue(openstack_utils.
1402 delete_security_group(self.neutron_client,
1405 @mock.patch('functest.utils.openstack_utils.logger.error')
1406 def test_delete_security_group_exception(self, mock_logger_error):
1407 self.assertFalse(openstack_utils.
1408 delete_security_group(Exception,
1410 self.assertTrue(mock_logger_error.called)
1412 def test_get_images_default(self):
1413 self.assertEqual(openstack_utils.
1414 get_images(self.glance_client),
1417 @mock.patch('functest.utils.openstack_utils.logger.error')
1418 def test_get_images_exception(self, mock_logger_error):
1419 self.assertEqual(openstack_utils.
1420 get_images(Exception),
1422 self.assertTrue(mock_logger_error.called)
1424 def test_get_image_id_default(self):
1425 self.assertEqual(openstack_utils.
1426 get_image_id(self.glance_client,
1430 # create_glance_image, get_or_create_image
1431 @mock.patch('functest.utils.openstack_utils.logger.error')
1432 def test_create_glance_image_file_present(self, mock_logger_error):
1433 with mock.patch('functest.utils.openstack_utils.'
1435 return_value=False):
1436 self.assertEqual(openstack_utils.
1437 create_glance_image(self.glance_client,
1441 self.assertTrue(mock_logger_error.called)
1443 @mock.patch('functest.utils.openstack_utils.logger.info')
1444 def test_create_glance_image_already_exist(self, mock_logger_info):
1445 with mock.patch('functest.utils.openstack_utils.'
1447 return_value=True), \
1448 mock.patch('functest.utils.openstack_utils.get_image_id',
1449 return_value='image_id'):
1450 self.assertEqual(openstack_utils.
1451 create_glance_image(self.glance_client,
1455 self.assertTrue(mock_logger_info.called)
1457 @mock.patch('functest.utils.openstack_utils.logger.info')
1458 def test_create_glance_image_default(self, mock_logger_info):
1459 with mock.patch('functest.utils.openstack_utils.'
1461 return_value=True), \
1462 mock.patch('functest.utils.openstack_utils.get_image_id',
1464 mock.patch('six.moves.builtins.open',
1465 mock.mock_open(read_data='1')) as m:
1466 self.assertEqual(openstack_utils.
1467 create_glance_image(self.glance_client,
1471 m.assert_called_once_with('file_path')
1472 self.assertTrue(mock_logger_info.called)
1474 @mock.patch('functest.utils.openstack_utils.logger.error')
1475 def test_create_glance_image_exception(self, mock_logger_error):
1476 with mock.patch('functest.utils.openstack_utils.'
1478 return_value=True), \
1479 mock.patch('functest.utils.openstack_utils.get_image_id',
1480 side_effect=Exception):
1481 self.assertEqual(openstack_utils.
1482 create_glance_image(self.glance_client,
1486 self.assertTrue(mock_logger_error.called)
1488 def test_delete_glance_image_default(self):
1489 self.assertTrue(openstack_utils.
1490 delete_glance_image(self.nova_client,
1493 @mock.patch('functest.utils.openstack_utils.logger.error')
1494 def test_delete_glance_image_exception(self, mock_logger_error):
1495 self.assertFalse(openstack_utils.
1496 delete_glance_image(Exception,
1498 self.assertTrue(mock_logger_error.called)
1500 def test_get_volumes_default(self):
1501 self.assertEqual(openstack_utils.
1502 get_volumes(self.cinder_client),
1505 @mock.patch('functest.utils.openstack_utils.logger.error')
1506 def test_get_volumes_exception(self, mock_logger_error):
1507 self.assertEqual(openstack_utils.
1508 get_volumes(Exception),
1510 self.assertTrue(mock_logger_error.called)
1512 def test_update_cinder_quota_default(self):
1513 self.assertTrue(openstack_utils.
1514 update_cinder_quota(self.cinder_client,
1520 @mock.patch('functest.utils.openstack_utils.logger.error')
1521 def test_update_cinder_quota_exception(self, mock_logger_error):
1522 self.assertFalse(openstack_utils.
1523 update_cinder_quota(Exception,
1528 self.assertTrue(mock_logger_error.called)
1530 def test_delete_volume_default(self):
1531 self.assertTrue(openstack_utils.
1532 delete_volume(self.cinder_client,
1536 self.assertTrue(openstack_utils.
1537 delete_volume(self.cinder_client,
1541 @mock.patch('functest.utils.openstack_utils.logger.error')
1542 def test_delete_volume_exception(self, mock_logger_error):
1543 self.assertFalse(openstack_utils.
1544 delete_volume(Exception,
1547 self.assertTrue(mock_logger_error.called)
1549 def test_get_tenants_default(self):
1550 with mock.patch('functest.utils.openstack_utils.'
1551 'is_keystone_v3', return_value=True):
1552 self.assertEqual(openstack_utils.
1553 get_tenants(self.keystone_client),
1555 with mock.patch('functest.utils.openstack_utils.'
1556 'is_keystone_v3', return_value=False):
1557 self.assertEqual(openstack_utils.
1558 get_tenants(self.keystone_client),
1561 @mock.patch('functest.utils.openstack_utils.logger.error')
1562 def test_get_tenants_exception(self, mock_logger_error):
1563 self.assertEqual(openstack_utils.
1564 get_tenants(Exception),
1566 self.assertTrue(mock_logger_error.called)
1568 def test_get_users_default(self):
1569 self.assertEqual(openstack_utils.
1570 get_users(self.keystone_client),
1573 @mock.patch('functest.utils.openstack_utils.logger.error')
1574 def test_get_users_exception(self, mock_logger_error):
1575 self.assertEqual(openstack_utils.
1576 get_users(Exception),
1578 self.assertTrue(mock_logger_error.called)
1580 def test_get_tenant_id_default(self):
1581 self.assertEqual(openstack_utils.
1582 get_tenant_id(self.keystone_client,
1586 def test_get_user_id_default(self):
1587 self.assertEqual(openstack_utils.
1588 get_user_id(self.keystone_client,
1592 def test_get_role_id_default(self):
1593 self.assertEqual(openstack_utils.
1594 get_role_id(self.keystone_client,
1598 def test_get_domain_id_default(self):
1599 self.assertEqual(openstack_utils.
1600 get_domain_id(self.keystone_client,
1604 def test_create_tenant_default(self):
1605 with mock.patch('functest.utils.openstack_utils.'
1606 'is_keystone_v3', return_value=True):
1607 CONST.__setattr__('OS_PROJECT_DOMAIN_NAME', 'Default')
1608 self.assertEqual(openstack_utils.
1609 create_tenant(self.keystone_client,
1613 with mock.patch('functest.utils.openstack_utils.'
1614 'is_keystone_v3', return_value=False):
1615 self.assertEqual(openstack_utils.
1616 create_tenant(self.keystone_client,
1621 @mock.patch('functest.utils.openstack_utils.logger.error')
1622 def test_create_tenant_exception(self, mock_logger_error):
1623 self.assertEqual(openstack_utils.
1624 create_tenant(Exception,
1628 self.assertTrue(mock_logger_error.called)
1630 def test_create_user_default(self):
1631 with mock.patch('functest.utils.openstack_utils.'
1632 'is_keystone_v3', return_value=True):
1633 self.assertEqual(openstack_utils.
1634 create_user(self.keystone_client,
1640 with mock.patch('functest.utils.openstack_utils.'
1641 'is_keystone_v3', return_value=False):
1642 self.assertEqual(openstack_utils.
1643 create_user(self.keystone_client,
1650 @mock.patch('functest.utils.openstack_utils.logger.error')
1651 def test_create_user_exception(self, mock_logger_error):
1652 self.assertEqual(openstack_utils.
1653 create_user(Exception,
1659 self.assertTrue(mock_logger_error.called)
1661 def test_add_role_user_default(self):
1662 with mock.patch('functest.utils.openstack_utils.'
1663 'is_keystone_v3', return_value=True):
1664 self.assertTrue(openstack_utils.
1665 add_role_user(self.keystone_client,
1670 with mock.patch('functest.utils.openstack_utils.'
1671 'is_keystone_v3', return_value=False):
1672 self.assertTrue(openstack_utils.
1673 add_role_user(self.keystone_client,
1678 @mock.patch('functest.utils.openstack_utils.logger.error')
1679 def test_add_role_user_exception(self, mock_logger_error):
1680 self.assertFalse(openstack_utils.
1681 add_role_user(Exception,
1685 self.assertTrue(mock_logger_error.called)
1687 def test_delete_tenant_default(self):
1688 with mock.patch('functest.utils.openstack_utils.'
1689 'is_keystone_v3', return_value=True):
1690 self.assertTrue(openstack_utils.
1691 delete_tenant(self.keystone_client,
1694 with mock.patch('functest.utils.openstack_utils.'
1695 'is_keystone_v3', return_value=False):
1696 self.assertTrue(openstack_utils.
1697 delete_tenant(self.keystone_client,
1700 @mock.patch('functest.utils.openstack_utils.logger.error')
1701 def test_delete_tenant_exception(self, mock_logger_error):
1702 self.assertFalse(openstack_utils.
1703 delete_tenant(Exception,
1705 self.assertTrue(mock_logger_error.called)
1707 def test_delete_user_default(self):
1708 self.assertTrue(openstack_utils.
1709 delete_user(self.keystone_client,
1712 @mock.patch('functest.utils.openstack_utils.logger.error')
1713 def test_delete_user_exception(self, mock_logger_error):
1714 self.assertFalse(openstack_utils.
1715 delete_user(Exception,
1717 self.assertTrue(mock_logger_error.called)
1719 def test_get_resource_default(self):
1720 with mock.patch('functest.utils.openstack_utils.'
1721 'is_keystone_v3', return_value=True):
1722 self.assertEqual(openstack_utils.
1723 get_resource(self.heat_client,
1728 @mock.patch('functest.utils.openstack_utils.logger.error')
1729 def test_get_resource_exception(self, mock_logger_error):
1730 self.assertEqual(openstack_utils.
1731 get_resource(Exception,
1735 self.assertTrue(mock_logger_error.called)
1737 def test_get_or_create_user_for_vnf_get(self):
1738 with mock.patch('functest.utils.openstack_utils.'
1740 return_value='user_id'), \
1741 mock.patch('functest.utils.openstack_utils.get_tenant_id',
1742 return_value='tenant_id'):
1743 self.assertFalse(openstack_utils.
1744 get_or_create_user_for_vnf(self.keystone_client,
1747 def test_get_or_create_user_for_vnf_create(self):
1748 with mock.patch('functest.utils.openstack_utils.'
1750 return_value=None), \
1751 mock.patch('functest.utils.openstack_utils.get_tenant_id',
1752 return_value='tenant_id'):
1753 self.assertTrue(openstack_utils.
1754 get_or_create_user_for_vnf(self.keystone_client,
1757 def test_get_or_create_user_for_vnf_error_get_user_id(self):
1758 with mock.patch('functest.utils.openstack_utils.'
1760 side_effect=Exception):
1761 self.assertRaises(Exception)
1763 def test_get_or_create_user_for_vnf_error_get_tenant_id(self):
1764 with mock.patch('functest.utils.openstack_utils.'
1766 return_value='user_id'), \
1767 mock.patch('functest.utils.openstack_utils.get_tenant_id',
1768 side_effect='Exception'):
1769 self.assertRaises(Exception)
1771 def test_get_or_create_tenant_for_vnf_get(self):
1772 with mock.patch('functest.utils.openstack_utils.'
1774 return_value='tenant_id'):
1776 openstack_utils.get_or_create_tenant_for_vnf(
1777 self.keystone_client, 'tenant_name', 'tenant_description'))
1779 def test_get_or_create_tenant_for_vnf_create(self):
1780 with mock.patch('functest.utils.openstack_utils.get_tenant_id',
1783 openstack_utils.get_or_create_tenant_for_vnf(
1784 self.keystone_client, 'tenant_name', 'tenant_description'))
1786 def test_get_or_create_tenant_for_vnf_error_get_tenant_id(self):
1787 with mock.patch('functest.utils.openstack_utils.'
1789 side_effect=Exception):
1790 self.assertRaises(Exception)
1792 def test_download_and_add_image_on_glance_image_creation_failure(self):
1793 with mock.patch('functest.utils.openstack_utils.'
1795 mock.patch('functest.utils.openstack_utils.'
1796 'ft_utils.download_url',
1797 return_value=True), \
1798 mock.patch('functest.utils.openstack_utils.'
1799 'create_glance_image',
1801 resp = openstack_utils.download_and_add_image_on_glance(
1806 self.assertEqual(resp, False)
1809 if __name__ == "__main__":
1810 logging.disable(logging.CRITICAL)
1811 unittest.main(verbosity=2)