3 # All rights reserved. This program and the accompanying materials
4 # are made available under the terms of the Apache License, Version 2.0
5 # which accompanies this distribution, and is available at
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # pylint: disable=missing-docstring
17 from functest.utils import openstack_utils
18 from functest.utils.constants import CONST
21 class OSUtilsTesting(unittest.TestCase):
23 def _get_env_cred_dict(self, os_prefix=''):
24 return {'OS_USERNAME': os_prefix + 'username',
25 'OS_PASSWORD': os_prefix + 'password',
26 'OS_AUTH_URL': os_prefix + 'auth_url',
27 'OS_TENANT_NAME': os_prefix + 'tenant_name',
28 'OS_USER_DOMAIN_NAME': os_prefix + 'user_domain_name',
29 'OS_PROJECT_DOMAIN_NAME': os_prefix + 'project_domain_name',
30 'OS_PROJECT_NAME': os_prefix + 'project_name',
31 'OS_ENDPOINT_TYPE': os_prefix + 'endpoint_type',
32 'OS_REGION_NAME': os_prefix + 'region_name',
33 'OS_CACERT': os_prefix + 'https_cacert',
34 'OS_INSECURE': os_prefix + 'https_insecure'}
36 def _get_os_env_vars(self):
37 return {'username': 'test_username', 'password': 'test_password',
38 'auth_url': 'test_auth_url', 'tenant_name': 'test_tenant_name',
39 'user_domain_name': 'test_user_domain_name',
40 'project_domain_name': 'test_project_domain_name',
41 'project_name': 'test_project_name',
42 'endpoint_type': 'test_endpoint_type',
43 'region_name': 'test_region_name',
44 'https_cacert': 'test_https_cacert',
45 'https_insecure': 'test_https_insecure'}
48 self.env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD']
49 self.tenant_name = 'test_tenant_name'
50 self.env_cred_dict = self._get_env_cred_dict()
51 self.os_environs = self._get_env_cred_dict(os_prefix='test_')
52 self.os_env_vars = self._get_os_env_vars()
54 mock_obj = mock.Mock()
55 attrs = {'name': 'test_flavor',
58 mock_obj.configure_mock(**attrs)
59 self.flavor = mock_obj
61 mock_obj = mock.Mock()
62 attrs = {'name': 'test_aggregate',
64 'hosts': ['host_name']}
65 mock_obj.configure_mock(**attrs)
66 self.aggregate = mock_obj
68 mock_obj = mock.Mock()
69 attrs = {'id': 'instance_id',
70 'name': 'test_instance',
72 mock_obj.configure_mock(**attrs)
73 self.instance = mock_obj
75 mock_obj = mock.Mock()
76 attrs = {'id': 'azone_id',
77 'zoneName': 'test_azone',
79 mock_obj.configure_mock(**attrs)
80 self.availability_zone = mock_obj
82 mock_obj = mock.Mock()
83 attrs = {'floating_network_id': 'floating_id',
84 'floating_ip_address': 'test_floating_ip'}
85 mock_obj.configure_mock(**attrs)
86 self.floating_ip = mock_obj
88 mock_obj = mock.Mock()
89 attrs = {'id': 'hypervisor_id',
90 'hypervisor_hostname': 'test_hostname',
92 mock_obj.configure_mock(**attrs)
93 self.hypervisor = mock_obj
95 mock_obj = mock.Mock()
96 attrs = {'id': 'image_id',
98 mock_obj.configure_mock(**attrs)
101 mock_obj = mock.Mock()
102 self.mock_return = mock_obj
104 self.nova_client = mock.Mock()
105 attrs = {'servers.list.return_value': [self.instance],
106 'servers.get.return_value': self.instance,
107 'servers.find.return_value': self.instance,
108 'servers.create.return_value': self.instance,
109 'flavors.list.return_value': [self.flavor],
110 'flavors.find.return_value': self.flavor,
111 'servers.add_floating_ip.return_value': mock.Mock(),
112 'servers.force_delete.return_value': mock.Mock(),
113 'aggregates.list.return_value': [self.aggregate],
114 'aggregates.add_host.return_value': mock.Mock(),
115 'aggregates.remove_host.return_value': mock.Mock(),
116 'aggregates.get.return_value': self.aggregate,
117 'aggregates.delete.return_value': mock.Mock(),
118 'availability_zones.list.return_value':
119 [self.availability_zone],
120 'hypervisors.list.return_value': [self.hypervisor],
121 'create.return_value': mock.Mock(),
122 'add_security_group.return_value': mock.Mock(),
123 'images.list.return_value': [self.image],
124 'images.delete.return_value': mock.Mock(),
126 self.nova_client.configure_mock(**attrs)
128 self.glance_client = mock.Mock()
129 attrs = {'images.list.return_value': [self.image],
130 'images.create.return_value': self.image,
131 'images.upload.return_value': mock.Mock()}
132 self.glance_client.configure_mock(**attrs)
134 mock_obj = mock.Mock()
135 attrs = {'id': 'volume_id',
136 'name': 'test_volume'}
137 mock_obj.configure_mock(**attrs)
138 self.volume = mock_obj
140 self.cinder_client = mock.Mock()
141 attrs = {'volumes.list.return_value': [self.volume],
142 'quotas.update.return_value': mock.Mock(),
143 'volumes.detach.return_value': mock.Mock(),
144 'volumes.force_delete.return_value': mock.Mock(),
145 'volumes.delete.return_value': mock.Mock()
147 self.cinder_client.configure_mock(**attrs)
149 self.resource = mock.Mock()
150 attrs = {'id': 'resource_test_id',
151 'name': 'resource_test_name'
154 self.heat_client = mock.Mock()
155 attrs = {'resources.get.return_value': self.resource}
156 self.heat_client.configure_mock(**attrs)
158 mock_obj = mock.Mock()
159 attrs = {'id': 'tenant_id',
160 'name': 'test_tenant'}
161 mock_obj.configure_mock(**attrs)
162 self.tenant = mock_obj
164 mock_obj = mock.Mock()
165 attrs = {'id': 'user_id',
167 mock_obj.configure_mock(**attrs)
170 mock_obj = mock.Mock()
171 attrs = {'id': 'role_id',
173 mock_obj.configure_mock(**attrs)
176 mock_obj = mock.Mock()
177 attrs = {'id': 'domain_id',
178 'name': 'test_domain'}
179 mock_obj.configure_mock(**attrs)
180 self.domain = mock_obj
182 self.keystone_client = mock.Mock()
183 attrs = {'projects.list.return_value': [self.tenant],
184 'tenants.list.return_value': [self.tenant],
185 'users.list.return_value': [self.user],
186 'roles.list.return_value': [self.role],
187 'domains.list.return_value': [self.domain],
188 'projects.create.return_value': self.tenant,
189 'tenants.create.return_value': self.tenant,
190 'users.create.return_value': self.user,
191 'roles.grant.return_value': mock.Mock(),
192 'roles.add_user_role.return_value': mock.Mock(),
193 'projects.delete.return_value': mock.Mock(),
194 'tenants.delete.return_value': mock.Mock(),
195 'users.delete.return_value': mock.Mock(),
197 self.keystone_client.configure_mock(**attrs)
199 self.router = {'id': 'router_id',
200 'name': 'test_router'}
202 self.subnet = {'id': 'subnet_id',
203 'name': 'test_subnet'}
205 self.networks = [{'id': 'network_id',
206 'name': 'test_network',
207 'router:external': False,
209 'subnets': [self.subnet]},
210 {'id': 'network_id1',
211 'name': 'test_network1',
212 'router:external': True,
214 'subnets': [self.subnet]}]
216 self.port = {'id': 'port_id',
219 self.sec_group = {'id': 'sec_group_id',
220 'name': 'test_sec_group'}
222 self.sec_group_rule = {'id': 'sec_group_rule_id',
223 'direction': 'direction',
224 'protocol': 'protocol',
225 'port_range_max': 'port_max',
226 'security_group_id': self.sec_group['id'],
227 'port_range_min': 'port_min'}
228 self.neutron_floatingip = {'id': 'fip_id',
229 'floating_ip_address': 'test_ip'}
230 self.neutron_client = mock.Mock()
231 attrs = {'list_networks.return_value': {'networks': self.networks},
232 'list_routers.return_value': {'routers': [self.router]},
233 'list_ports.return_value': {'ports': [self.port]},
234 'list_subnets.return_value': {'subnets': [self.subnet]},
235 'create_network.return_value': {'network': self.networks[0]},
236 'create_subnet.return_value': {'subnets': [self.subnet]},
237 'create_router.return_value': {'router': self.router},
238 'create_port.return_value': {'port': self.port},
239 'create_floatingip.return_value': {'floatingip':
240 self.neutron_floatingip},
241 'update_network.return_value': mock.Mock(),
242 'update_port.return_value': {'port': self.port},
243 'add_interface_router.return_value': mock.Mock(),
244 'add_gateway_router.return_value': mock.Mock(),
245 'delete_network.return_value': mock.Mock(),
246 'delete_subnet.return_value': mock.Mock(),
247 'delete_router.return_value': mock.Mock(),
248 'delete_port.return_value': mock.Mock(),
249 'remove_interface_router.return_value': mock.Mock(),
250 'remove_gateway_router.return_value': mock.Mock(),
251 'list_security_groups.return_value': {'security_groups':
253 'list_security_group_rules.'
254 'return_value': {'security_group_rules':
255 [self.sec_group_rule]},
256 'create_security_group_rule.return_value': mock.Mock(),
257 'create_security_group.return_value': {'security_group':
259 'update_quota.return_value': mock.Mock(),
260 'delete_security_group.return_value': mock.Mock(),
261 'list_floatingips.return_value': {'floatingips':
263 'delete_floatingip.return_value': mock.Mock(),
265 self.neutron_client.configure_mock(**attrs)
267 self.empty_client = mock.Mock()
268 attrs = {'list_networks.return_value': {'networks': []},
269 'list_routers.return_value': {'routers': []},
270 'list_ports.return_value': {'ports': []},
271 'list_subnets.return_value': {'subnets': []}}
272 self.empty_client.configure_mock(**attrs)
274 @mock.patch('functest.utils.openstack_utils.os.getenv',
276 def test_is_keystone_v3_missing_identity(self, mock_os_getenv):
277 self.assertEqual(openstack_utils.is_keystone_v3(), False)
279 @mock.patch('functest.utils.openstack_utils.os.getenv',
281 def test_is_keystone_v3_default(self, mock_os_getenv):
282 self.assertEqual(openstack_utils.is_keystone_v3(), True)
284 @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
286 def test_get_rc_env_vars_missing_identity(self, mock_get_rc_env):
287 exp_resp = self.env_vars
288 exp_resp.extend(['OS_TENANT_NAME'])
289 self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
291 @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
293 def test_get_rc_env_vars_default(self, mock_get_rc_env):
294 exp_resp = self.env_vars
295 exp_resp.extend(['OS_PROJECT_NAME',
296 'OS_USER_DOMAIN_NAME',
297 'OS_PROJECT_DOMAIN_NAME'])
298 self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
300 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
301 def test_check_credentials_missing_env(self, mock_get_rc_env):
302 exp_resp = self.env_vars
303 exp_resp.extend(['OS_TENANT_NAME'])
304 mock_get_rc_env.return_value = exp_resp
305 with mock.patch.dict('functest.utils.openstack_utils.os.environ', {},
307 self.assertEqual(openstack_utils.check_credentials(), False)
309 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
310 def test_check_credentials_default(self, mock_get_rc_env):
311 exp_resp = ['OS_TENANT_NAME']
312 mock_get_rc_env.return_value = exp_resp
313 with mock.patch.dict('functest.utils.openstack_utils.os.environ',
314 {'OS_TENANT_NAME': self.tenant_name},
316 self.assertEqual(openstack_utils.check_credentials(), True)
318 def test_get_env_cred_dict(self):
319 self.assertDictEqual(openstack_utils.get_env_cred_dict(),
322 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
323 def test_get_credentials_default(self, mock_get_rc_env):
324 mock_get_rc_env.return_value = self.env_cred_dict.keys()
325 with mock.patch.dict('functest.utils.openstack_utils.os.environ',
328 self.assertDictEqual(openstack_utils.get_credentials(),
331 def _get_credentials_missing_env(self, var):
332 dic = copy.deepcopy(self.os_environs)
334 with mock.patch('functest.utils.openstack_utils.get_rc_env_vars',
335 return_value=self.env_cred_dict.keys()), \
336 mock.patch.dict('functest.utils.openstack_utils.os.environ',
339 self.assertRaises(openstack_utils.MissingEnvVar,
340 lambda: openstack_utils.get_credentials())
342 def test_get_credentials_missing_username(self):
343 self._get_credentials_missing_env('OS_USERNAME')
345 def test_get_credentials_missing_password(self):
346 self._get_credentials_missing_env('OS_PASSWORD')
348 def test_get_credentials_missing_auth_url(self):
349 self._get_credentials_missing_env('OS_AUTH_URL')
351 def test_get_credentials_missing_tenantname(self):
352 self._get_credentials_missing_env('OS_TENANT_NAME')
354 def test_get_credentials_missing_domainname(self):
355 self._get_credentials_missing_env('OS_USER_DOMAIN_NAME')
357 def test_get_credentials_missing_projectname(self):
358 self._get_credentials_missing_env('OS_PROJECT_NAME')
360 def test_get_credentials_missing_endpoint_type(self):
361 self._get_credentials_missing_env('OS_ENDPOINT_TYPE')
363 def _test_source_credentials(self, msg, key='OS_TENANT_NAME',
370 with mock.patch('six.moves.builtins.open',
371 mock.mock_open(read_data=msg),
373 m.return_value.__iter__ = lambda self: iter(self.readline, '')
374 openstack_utils.source_credentials(f)
375 m.assert_called_once_with(f, 'r')
376 self.assertEqual(os.environ[key], value)
378 def test_source_credentials(self):
379 self._test_source_credentials('OS_TENANT_NAME=admin')
380 self._test_source_credentials('OS_TENANT_NAME= admin')
381 self._test_source_credentials('OS_TENANT_NAME = admin')
382 self._test_source_credentials('OS_TENANT_NAME = "admin"')
383 self._test_source_credentials('export OS_TENANT_NAME=admin')
384 self._test_source_credentials('export OS_TENANT_NAME =admin')
385 self._test_source_credentials('export OS_TENANT_NAME = admin')
386 self._test_source_credentials('export OS_TENANT_NAME = "admin"')
387 # This test will fail as soon as rc_file is fixed
388 self._test_source_credentials(
389 'export "\'OS_TENANT_NAME\'" = "\'admin\'"')
391 @mock.patch('functest.utils.openstack_utils.os.getenv',
393 def test_get_keystone_client_version_missing_env(self, mock_os_getenv):
394 self.assertEqual(openstack_utils.get_keystone_client_version(),
395 openstack_utils.DEFAULT_API_VERSION)
397 @mock.patch('functest.utils.openstack_utils.logger.info')
398 @mock.patch('functest.utils.openstack_utils.os.getenv',
400 def test_get_keystone_client_version_default(self, mock_os_getenv,
402 self.assertEqual(openstack_utils.get_keystone_client_version(),
404 mock_logger_info.assert_called_once_with("OS_IDENTITY_API_VERSION is "
405 "set in env as '%s'", '3')
407 @mock.patch('functest.utils.openstack_utils.get_session')
408 @mock.patch('functest.utils.openstack_utils.keystoneclient.Client')
409 @mock.patch('functest.utils.openstack_utils.get_keystone_client_version',
411 @mock.patch('functest.utils.openstack_utils.os.getenv',
412 return_value='public')
413 def test_get_keystone_client_with_interface(self, mock_os_getenv,
414 mock_keystoneclient_version,
417 mock_keystone_obj = mock.Mock()
418 mock_session_obj = mock.Mock()
419 mock_key_client.return_value = mock_keystone_obj
420 mock_get_session.return_value = mock_session_obj
421 self.assertEqual(openstack_utils.get_keystone_client(),
423 mock_key_client.assert_called_once_with('3',
424 session=mock_session_obj,
427 @mock.patch('functest.utils.openstack_utils.get_session')
428 @mock.patch('functest.utils.openstack_utils.keystoneclient.Client')
429 @mock.patch('functest.utils.openstack_utils.get_keystone_client_version',
431 @mock.patch('functest.utils.openstack_utils.os.getenv',
432 return_value='admin')
433 def test_get_keystone_client_no_interface(self, mock_os_getenv,
434 mock_keystoneclient_version,
437 mock_keystone_obj = mock.Mock()
438 mock_session_obj = mock.Mock()
439 mock_key_client.return_value = mock_keystone_obj
440 mock_get_session.return_value = mock_session_obj
441 self.assertEqual(openstack_utils.get_keystone_client(),
443 mock_key_client.assert_called_once_with('3',
444 session=mock_session_obj,
447 @mock.patch('functest.utils.openstack_utils.os.getenv',
449 def test_get_nova_client_version_missing_env(self, mock_os_getenv):
450 self.assertEqual(openstack_utils.get_nova_client_version(),
451 openstack_utils.DEFAULT_API_VERSION)
453 @mock.patch('functest.utils.openstack_utils.logger.info')
454 @mock.patch('functest.utils.openstack_utils.os.getenv',
456 def test_get_nova_client_version_default(self, mock_os_getenv,
458 self.assertEqual(openstack_utils.get_nova_client_version(),
460 mock_logger_info.assert_called_once_with("OS_COMPUTE_API_VERSION is "
461 "set in env as '%s'", '3')
463 def test_get_nova_client(self):
464 mock_nova_obj = mock.Mock()
465 mock_session_obj = mock.Mock()
466 with mock.patch('functest.utils.openstack_utils'
467 '.get_nova_client_version', return_value='3'), \
468 mock.patch('functest.utils.openstack_utils'
469 '.novaclient.Client',
470 return_value=mock_nova_obj) \
471 as mock_nova_client, \
472 mock.patch('functest.utils.openstack_utils.get_session',
473 return_value=mock_session_obj):
474 self.assertEqual(openstack_utils.get_nova_client(),
476 mock_nova_client.assert_called_once_with('3',
477 session=mock_session_obj)
479 @mock.patch('functest.utils.openstack_utils.os.getenv',
481 def test_get_cinder_client_version_missing_env(self, mock_os_getenv):
482 self.assertEqual(openstack_utils.get_cinder_client_version(),
483 openstack_utils.DEFAULT_API_VERSION)
485 @mock.patch('functest.utils.openstack_utils.logger.info')
486 @mock.patch('functest.utils.openstack_utils.os.getenv',
488 def test_get_cinder_client_version_default(self, mock_os_getenv,
490 self.assertEqual(openstack_utils.get_cinder_client_version(),
492 mock_logger_info.assert_called_once_with("OS_VOLUME_API_VERSION is "
493 "set in env as '%s'", '3')
495 def test_get_cinder_client(self):
496 mock_cinder_obj = mock.Mock()
497 mock_session_obj = mock.Mock()
498 with mock.patch('functest.utils.openstack_utils'
499 '.get_cinder_client_version', return_value='3'), \
500 mock.patch('functest.utils.openstack_utils'
501 '.cinderclient.Client',
502 return_value=mock_cinder_obj) \
503 as mock_cind_client, \
504 mock.patch('functest.utils.openstack_utils.get_session',
505 return_value=mock_session_obj):
506 self.assertEqual(openstack_utils.get_cinder_client(),
508 mock_cind_client.assert_called_once_with('3',
509 session=mock_session_obj)
511 @mock.patch('functest.utils.openstack_utils.os.getenv',
513 def test_get_neutron_client_version_missing_env(self, mock_os_getenv):
514 self.assertEqual(openstack_utils.get_neutron_client_version(),
515 openstack_utils.DEFAULT_API_VERSION)
517 @mock.patch('functest.utils.openstack_utils.logger.info')
518 @mock.patch('functest.utils.openstack_utils.os.getenv',
520 def test_get_neutron_client_version_default(self, mock_os_getenv,
522 self.assertEqual(openstack_utils.get_neutron_client_version(),
524 mock_logger_info.assert_called_once_with("OS_NETWORK_API_VERSION is "
525 "set in env as '%s'", '3')
527 def test_get_neutron_client(self):
528 mock_neutron_obj = mock.Mock()
529 mock_session_obj = mock.Mock()
530 with mock.patch('functest.utils.openstack_utils'
531 '.get_neutron_client_version', return_value='3'), \
532 mock.patch('functest.utils.openstack_utils'
533 '.neutronclient.Client',
534 return_value=mock_neutron_obj) \
535 as mock_neut_client, \
536 mock.patch('functest.utils.openstack_utils.get_session',
537 return_value=mock_session_obj):
538 self.assertEqual(openstack_utils.get_neutron_client(),
540 mock_neut_client.assert_called_once_with('3',
541 session=mock_session_obj)
543 @mock.patch('functest.utils.openstack_utils.os.getenv',
545 def test_get_glance_client_version_missing_env(self, mock_os_getenv):
546 self.assertEqual(openstack_utils.get_glance_client_version(),
547 openstack_utils.DEFAULT_API_VERSION)
549 @mock.patch('functest.utils.openstack_utils.logger.info')
550 @mock.patch('functest.utils.openstack_utils.os.getenv',
552 def test_get_glance_client_version_default(self, mock_os_getenv,
554 self.assertEqual(openstack_utils.get_glance_client_version(),
556 mock_logger_info.assert_called_once_with("OS_IMAGE_API_VERSION is "
557 "set in env as '%s'", '3')
559 def test_get_glance_client(self):
560 mock_glance_obj = mock.Mock()
561 mock_session_obj = mock.Mock()
562 with mock.patch('functest.utils.openstack_utils'
563 '.get_glance_client_version', return_value='3'), \
564 mock.patch('functest.utils.openstack_utils'
565 '.glanceclient.Client',
566 return_value=mock_glance_obj) \
567 as mock_glan_client, \
568 mock.patch('functest.utils.openstack_utils.get_session',
569 return_value=mock_session_obj):
570 self.assertEqual(openstack_utils.get_glance_client(),
572 mock_glan_client.assert_called_once_with('3',
573 session=mock_session_obj)
575 @mock.patch('functest.utils.openstack_utils.os.getenv',
577 def test_get_heat_client_version_missing_env(self, mock_os_getenv):
578 self.assertEqual(openstack_utils.get_heat_client_version(),
579 openstack_utils.DEFAULT_HEAT_API_VERSION)
581 @mock.patch('functest.utils.openstack_utils.logger.info')
582 @mock.patch('functest.utils.openstack_utils.os.getenv', return_value='1')
583 def test_get_heat_client_version_default(self, mock_os_getenv,
585 self.assertEqual(openstack_utils.get_heat_client_version(), '1')
586 mock_logger_info.assert_called_once_with(
587 "OS_ORCHESTRATION_API_VERSION is set in env as '%s'", '1')
589 def test_get_heat_client(self):
590 mock_heat_obj = mock.Mock()
591 mock_session_obj = mock.Mock()
592 with mock.patch('functest.utils.openstack_utils'
593 '.get_heat_client_version', return_value='1'), \
594 mock.patch('functest.utils.openstack_utils'
595 '.heatclient.Client',
596 return_value=mock_heat_obj) \
597 as mock_heat_client, \
598 mock.patch('functest.utils.openstack_utils.get_session',
599 return_value=mock_session_obj):
600 self.assertEqual(openstack_utils.get_heat_client(),
602 mock_heat_client.assert_called_once_with('1',
603 session=mock_session_obj)
605 def test_get_instances_default(self):
606 self.assertEqual(openstack_utils.get_instances(self.nova_client),
609 @mock.patch('functest.utils.openstack_utils.logger.error')
610 def test_get_instances_exception(self, mock_logger_error):
611 self.assertEqual(openstack_utils.
612 get_instances(Exception),
614 self.assertTrue(mock_logger_error.called)
616 def test_get_instance_status_default(self):
617 self.assertEqual(openstack_utils.get_instance_status(self.nova_client,
621 @mock.patch('functest.utils.openstack_utils.logger.error')
622 def test_get_instance_status_exception(self, mock_logger_error):
623 self.assertEqual(openstack_utils.
624 get_instance_status(Exception,
627 self.assertTrue(mock_logger_error.called)
629 def test_get_instance_by_name_default(self):
630 self.assertEqual(openstack_utils.
631 get_instance_by_name(self.nova_client,
635 @mock.patch('functest.utils.openstack_utils.logger.error')
636 def test_get_instance_by_name_exception(self, mock_logger_error):
637 self.assertEqual(openstack_utils.
638 get_instance_by_name(Exception,
641 self.assertTrue(mock_logger_error.called)
643 def test_get_flavor_id_default(self):
644 self.assertEqual(openstack_utils.
645 get_flavor_id(self.nova_client,
649 def test_get_flavor_id_by_ram_range_default(self):
650 self.assertEqual(openstack_utils.
651 get_flavor_id_by_ram_range(self.nova_client,
655 def test_get_aggregates_default(self):
656 self.assertEqual(openstack_utils.
657 get_aggregates(self.nova_client),
660 @mock.patch('functest.utils.openstack_utils.logger.error')
661 def test_get_aggregates_exception(self, mock_logger_error):
662 self.assertEqual(openstack_utils.
663 get_aggregates(Exception),
665 self.assertTrue(mock_logger_error.called)
667 def test_get_aggregate_id_default(self):
668 with mock.patch('functest.utils.openstack_utils.get_aggregates',
669 return_value=[self.aggregate]):
670 self.assertEqual(openstack_utils.
671 get_aggregate_id(self.nova_client,
675 @mock.patch('functest.utils.openstack_utils.logger.error')
676 def test_get_aggregate_id_exception(self, mock_logger_error):
677 self.assertEqual(openstack_utils.
678 get_aggregate_id(Exception,
681 self.assertTrue(mock_logger_error.called)
683 def test_get_availability_zone_names_default(self):
684 with mock.patch('functest.utils.openstack_utils'
685 '.get_availability_zones',
686 return_value=[self.availability_zone]):
687 self.assertEqual(openstack_utils.
688 get_availability_zone_names(self.nova_client),
691 @mock.patch('functest.utils.openstack_utils.logger.error')
692 def test_get_availability_zone_names_exception(self, mock_logger_error):
693 self.assertEqual(openstack_utils.
694 get_availability_zone_names(Exception),
696 self.assertTrue(mock_logger_error.called)
698 def test_get_availability_zones_default(self):
699 self.assertEqual(openstack_utils.
700 get_availability_zones(self.nova_client),
701 [self.availability_zone])
703 @mock.patch('functest.utils.openstack_utils.logger.error')
704 def test_get_availability_zones_exception(self, mock_logger_error):
705 self.assertEqual(openstack_utils.
706 get_availability_zones(Exception),
708 self.assertTrue(mock_logger_error.called)
710 def test_get_floating_ips_default(self):
711 self.assertEqual(openstack_utils.
712 get_floating_ips(self.neutron_client),
715 @mock.patch('functest.utils.openstack_utils.logger.error')
716 def test_get_floating_ips_exception(self, mock_logger_error):
717 self.assertEqual(openstack_utils.
718 get_floating_ips(Exception),
720 self.assertTrue(mock_logger_error.called)
722 def test_get_hypervisors_default(self):
723 self.assertEqual(openstack_utils.
724 get_hypervisors(self.nova_client),
727 @mock.patch('functest.utils.openstack_utils.logger.error')
728 def test_get_hypervisors_exception(self, mock_logger_error):
729 self.assertEqual(openstack_utils.
730 get_hypervisors(Exception),
732 self.assertTrue(mock_logger_error.called)
734 def test_create_aggregate_default(self):
735 self.assertTrue(openstack_utils.
736 create_aggregate(self.nova_client,
740 @mock.patch('functest.utils.openstack_utils.logger.error')
741 def test_create_aggregate_exception(self, mock_logger_error):
742 self.assertEqual(openstack_utils.
743 create_aggregate(Exception,
747 self.assertTrue(mock_logger_error.called)
749 def test_add_host_to_aggregate_default(self):
750 with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
751 self.assertTrue(openstack_utils.
752 add_host_to_aggregate(self.nova_client,
756 @mock.patch('functest.utils.openstack_utils.logger.error')
757 def test_add_host_to_aggregate_exception(self, mock_logger_error):
758 self.assertEqual(openstack_utils.
759 add_host_to_aggregate(Exception,
763 self.assertTrue(mock_logger_error.called)
765 def test_create_aggregate_with_host_default(self):
766 with mock.patch('functest.utils.openstack_utils.create_aggregate'), \
767 mock.patch('functest.utils.openstack_utils.'
768 'add_host_to_aggregate'):
769 self.assertTrue(openstack_utils.
770 create_aggregate_with_host(self.nova_client,
775 @mock.patch('functest.utils.openstack_utils.logger.error')
776 def test_create_aggregate_with_host_exception(self, mock_logger_error):
777 with mock.patch('functest.utils.openstack_utils.create_aggregate',
778 side_effect=Exception):
779 self.assertEqual(openstack_utils.
780 create_aggregate_with_host(Exception,
785 self.assertTrue(mock_logger_error.called)
787 def test_create_instance_default(self):
788 with mock.patch('functest.utils.openstack_utils.'
790 return_value=self.nova_client):
791 self.assertEqual(openstack_utils.
792 create_instance('test_flavor',
797 @mock.patch('functest.utils.openstack_utils.logger.error')
798 def test_create_instance_exception(self, mock_logger_error):
799 with mock.patch('functest.utils.openstack_utils.'
801 return_value=self.nova_client):
802 self.nova_client.flavors.find.side_effect = Exception
803 self.assertEqual(openstack_utils.
804 create_instance('test_flavor',
808 self.assertTrue(mock_logger_error)
810 def test_create_floating_ip_default(self):
811 with mock.patch('functest.utils.openstack_utils.'
812 'get_external_net_id',
813 return_value='external_net_id'):
814 exp_resp = {'fip_addr': 'test_ip', 'fip_id': 'fip_id'}
815 self.assertEqual(openstack_utils.
816 create_floating_ip(self.neutron_client),
819 @mock.patch('functest.utils.openstack_utils.logger.error')
820 def test_create_floating_ip_exception(self, mock_logger_error):
821 with mock.patch('functest.utils.openstack_utils.'
822 'get_external_net_id',
823 return_value='external_net_id'):
824 self.assertEqual(openstack_utils.
825 create_floating_ip(Exception),
827 self.assertTrue(mock_logger_error)
829 def test_add_floating_ip_default(self):
830 with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
831 self.assertTrue(openstack_utils.
832 add_floating_ip(self.nova_client,
834 'test_floatingip_addr'))
836 @mock.patch('functest.utils.openstack_utils.logger.error')
837 def test_add_floating_ip_exception(self, mock_logger_error):
838 self.assertFalse(openstack_utils.
839 add_floating_ip(Exception,
841 'test_floatingip_addr'))
842 self.assertTrue(mock_logger_error.called)
844 def test_delete_instance_default(self):
845 self.assertTrue(openstack_utils.
846 delete_instance(self.nova_client,
849 @mock.patch('functest.utils.openstack_utils.logger.error')
850 def test_delete_instance_exception(self, mock_logger_error):
851 self.assertFalse(openstack_utils.
852 delete_instance(Exception,
854 self.assertTrue(mock_logger_error.called)
856 def test_delete_floating_ip_default(self):
857 self.assertTrue(openstack_utils.
858 delete_floating_ip(self.neutron_client,
861 @mock.patch('functest.utils.openstack_utils.logger.error')
862 def test_delete_floating_ip_exception(self, mock_logger_error):
863 self.assertFalse(openstack_utils.
864 delete_floating_ip(Exception,
866 self.assertTrue(mock_logger_error.called)
868 def test_remove_host_from_aggregate_default(self):
869 with mock.patch('functest.utils.openstack_utils.'
871 self.assertTrue(openstack_utils.
872 remove_host_from_aggregate(self.nova_client,
876 @mock.patch('functest.utils.openstack_utils.logger.error')
877 def test_remove_host_from_aggregate_exception(self, mock_logger_error):
878 with mock.patch('functest.utils.openstack_utils.'
879 'get_aggregate_id', side_effect=Exception):
880 self.assertFalse(openstack_utils.
881 remove_host_from_aggregate(self.nova_client,
884 self.assertTrue(mock_logger_error.called)
886 def test_remove_hosts_from_aggregate_default(self):
887 with mock.patch('functest.utils.openstack_utils.'
888 'get_aggregate_id'), \
889 mock.patch('functest.utils.openstack_utils.'
890 'remove_host_from_aggregate',
893 openstack_utils.remove_hosts_from_aggregate(self.nova_client,
895 mock_method.assert_any_call(self.nova_client,
899 def test_delete_aggregate_default(self):
900 with mock.patch('functest.utils.openstack_utils.'
901 'remove_hosts_from_aggregate'):
902 self.assertTrue(openstack_utils.
903 delete_aggregate(self.nova_client,
906 @mock.patch('functest.utils.openstack_utils.logger.error')
907 def test_delete_aggregate_exception(self, mock_logger_error):
908 with mock.patch('functest.utils.openstack_utils.'
909 'remove_hosts_from_aggregate', side_effect=Exception):
910 self.assertFalse(openstack_utils.
911 delete_aggregate(self.nova_client,
913 self.assertTrue(mock_logger_error.called)
915 def test_get_network_list_default(self):
916 self.assertEqual(openstack_utils.
917 get_network_list(self.neutron_client),
920 def test_get_network_list_missing_network(self):
921 self.assertEqual(openstack_utils.
922 get_network_list(self.empty_client),
925 def test_get_router_list_default(self):
926 self.assertEqual(openstack_utils.
927 get_router_list(self.neutron_client),
930 def test_get_router_list_missing_router(self):
931 self.assertEqual(openstack_utils.
932 get_router_list(self.empty_client),
935 def test_get_port_list_default(self):
936 self.assertEqual(openstack_utils.
937 get_port_list(self.neutron_client),
940 def test_get_port_list_missing_port(self):
941 self.assertEqual(openstack_utils.
942 get_port_list(self.empty_client),
945 def test_get_network_id_default(self):
946 self.assertEqual(openstack_utils.
947 get_network_id(self.neutron_client,
951 def test_get_subnet_id_default(self):
952 self.assertEqual(openstack_utils.
953 get_subnet_id(self.neutron_client,
957 def test_get_router_id_default(self):
958 self.assertEqual(openstack_utils.
959 get_router_id(self.neutron_client,
963 def test_get_private_net_default(self):
964 self.assertEqual(openstack_utils.
965 get_private_net(self.neutron_client),
968 def test_get_private_net_missing_net(self):
969 self.assertEqual(openstack_utils.
970 get_private_net(self.empty_client),
973 def test_get_external_net_default(self):
974 self.assertEqual(openstack_utils.
975 get_external_net(self.neutron_client),
978 def test_get_external_net_missing_net(self):
979 self.assertEqual(openstack_utils.
980 get_external_net(self.empty_client),
983 def test_get_external_net_id_default(self):
984 self.assertEqual(openstack_utils.
985 get_external_net_id(self.neutron_client),
988 def test_get_external_net_id_missing_net(self):
989 self.assertEqual(openstack_utils.
990 get_external_net_id(self.empty_client),
993 def test_check_neutron_net_default(self):
994 self.assertTrue(openstack_utils.
995 check_neutron_net(self.neutron_client,
998 def test_check_neutron_net_missing_net(self):
999 self.assertFalse(openstack_utils.
1000 check_neutron_net(self.empty_client,
1003 def test_create_neutron_net_default(self):
1004 self.assertEqual(openstack_utils.
1005 create_neutron_net(self.neutron_client,
1009 @mock.patch('functest.utils.openstack_utils.logger.error')
1010 def test_create_neutron_net_exception(self, mock_logger_error):
1011 self.assertEqual(openstack_utils.
1012 create_neutron_net(Exception,
1015 self.assertTrue(mock_logger_error.called)
1017 def test_create_neutron_subnet_default(self):
1018 self.assertEqual(openstack_utils.
1019 create_neutron_subnet(self.neutron_client,
1025 @mock.patch('functest.utils.openstack_utils.logger.error')
1026 def test_create_neutron_subnet_exception(self, mock_logger_error):
1027 self.assertEqual(openstack_utils.
1028 create_neutron_subnet(Exception,
1033 self.assertTrue(mock_logger_error.called)
1035 def test_create_neutron_router_default(self):
1036 self.assertEqual(openstack_utils.
1037 create_neutron_router(self.neutron_client,
1041 @mock.patch('functest.utils.openstack_utils.logger.error')
1042 def test_create_neutron_router_exception(self, mock_logger_error):
1043 self.assertEqual(openstack_utils.
1044 create_neutron_router(Exception,
1047 self.assertTrue(mock_logger_error.called)
1049 def test_create_neutron_port_default(self):
1050 self.assertEqual(openstack_utils.
1051 create_neutron_port(self.neutron_client,
1057 @mock.patch('functest.utils.openstack_utils.logger.error')
1058 def test_create_neutron_port_exception(self, mock_logger_error):
1059 self.assertEqual(openstack_utils.
1060 create_neutron_port(Exception,
1065 self.assertTrue(mock_logger_error.called)
1067 def test_update_neutron_net_default(self):
1068 self.assertTrue(openstack_utils.
1069 update_neutron_net(self.neutron_client,
1072 @mock.patch('functest.utils.openstack_utils.logger.error')
1073 def test_update_neutron_net_exception(self, mock_logger_error):
1074 self.assertFalse(openstack_utils.
1075 update_neutron_net(Exception,
1077 self.assertTrue(mock_logger_error.called)
1079 def test_update_neutron_port_default(self):
1080 self.assertEqual(openstack_utils.
1081 update_neutron_port(self.neutron_client,
1086 @mock.patch('functest.utils.openstack_utils.logger.error')
1087 def test_update_neutron_port_exception(self, mock_logger_error):
1088 self.assertEqual(openstack_utils.
1089 update_neutron_port(Exception,
1093 self.assertTrue(mock_logger_error.called)
1095 def test_add_interface_router_default(self):
1096 self.assertTrue(openstack_utils.
1097 add_interface_router(self.neutron_client,
1101 @mock.patch('functest.utils.openstack_utils.logger.error')
1102 def test_add_interface_router_exception(self, mock_logger_error):
1103 self.assertFalse(openstack_utils.
1104 add_interface_router(Exception,
1107 self.assertTrue(mock_logger_error.called)
1109 def test_add_gateway_router_default(self):
1110 with mock.patch('functest.utils.openstack_utils.'
1111 'get_external_net_id',
1112 return_value='network_id'):
1113 self.assertTrue(openstack_utils.
1114 add_gateway_router(self.neutron_client,
1117 @mock.patch('functest.utils.openstack_utils.logger.error')
1118 def test_add_gateway_router_exception(self, mock_logger_error):
1119 with mock.patch('functest.utils.openstack_utils.'
1120 'get_external_net_id',
1121 return_value='network_id'):
1122 self.assertFalse(openstack_utils.
1123 add_gateway_router(Exception,
1125 self.assertTrue(mock_logger_error.called)
1127 def test_delete_neutron_net_default(self):
1128 self.assertTrue(openstack_utils.
1129 delete_neutron_net(self.neutron_client,
1132 @mock.patch('functest.utils.openstack_utils.logger.error')
1133 def test_delete_neutron_net_exception(self, mock_logger_error):
1134 self.assertFalse(openstack_utils.
1135 delete_neutron_net(Exception,
1137 self.assertTrue(mock_logger_error.called)
1139 def test_delete_neutron_subnet_default(self):
1140 self.assertTrue(openstack_utils.
1141 delete_neutron_subnet(self.neutron_client,
1144 @mock.patch('functest.utils.openstack_utils.logger.error')
1145 def test_delete_neutron_subnet_exception(self, mock_logger_error):
1146 self.assertFalse(openstack_utils.
1147 delete_neutron_subnet(Exception,
1149 self.assertTrue(mock_logger_error.called)
1151 def test_delete_neutron_router_default(self):
1152 self.assertTrue(openstack_utils.
1153 delete_neutron_router(self.neutron_client,
1156 @mock.patch('functest.utils.openstack_utils.logger.error')
1157 def test_delete_neutron_router_exception(self, mock_logger_error):
1158 self.assertFalse(openstack_utils.
1159 delete_neutron_router(Exception,
1161 self.assertTrue(mock_logger_error.called)
1163 def test_delete_neutron_port_default(self):
1164 self.assertTrue(openstack_utils.
1165 delete_neutron_port(self.neutron_client,
1168 @mock.patch('functest.utils.openstack_utils.logger.error')
1169 def test_delete_neutron_port_exception(self, mock_logger_error):
1170 self.assertFalse(openstack_utils.
1171 delete_neutron_port(Exception,
1173 self.assertTrue(mock_logger_error.called)
1175 def test_remove_interface_router_default(self):
1176 self.assertTrue(openstack_utils.
1177 remove_interface_router(self.neutron_client,
1181 @mock.patch('functest.utils.openstack_utils.logger.error')
1182 def test_remove_interface_router_exception(self, mock_logger_error):
1183 self.assertFalse(openstack_utils.
1184 remove_interface_router(Exception,
1187 self.assertTrue(mock_logger_error.called)
1189 def test_remove_gateway_router_default(self):
1190 self.assertTrue(openstack_utils.
1191 remove_gateway_router(self.neutron_client,
1194 @mock.patch('functest.utils.openstack_utils.logger.error')
1195 def test_remove_gateway_router_exception(self, mock_logger_error):
1196 self.assertFalse(openstack_utils.
1197 remove_gateway_router(Exception,
1199 self.assertTrue(mock_logger_error.called)
1201 def test_get_security_groups_default(self):
1202 self.assertEqual(openstack_utils.
1203 get_security_groups(self.neutron_client),
1206 @mock.patch('functest.utils.openstack_utils.logger.error')
1207 def test_get_security_groups_exception(self, mock_logger_error):
1208 self.assertEqual(openstack_utils.
1209 get_security_groups(Exception),
1211 self.assertTrue(mock_logger_error.called)
1213 def test_get_security_group_id_default(self):
1214 with mock.patch('functest.utils.openstack_utils.'
1215 'get_security_groups',
1216 return_value=[self.sec_group]):
1217 self.assertEqual(openstack_utils.
1218 get_security_group_id(self.neutron_client,
1222 def test_get_security_group_rules_default(self):
1223 self.assertEqual(openstack_utils.
1224 get_security_group_rules(self.neutron_client,
1225 self.sec_group['id']),
1226 [self.sec_group_rule])
1228 @mock.patch('functest.utils.openstack_utils.logger.error')
1229 def test_get_security_group_rules_exception(self, mock_logger_error):
1230 self.assertEqual(openstack_utils.
1231 get_security_group_rules(Exception,
1234 self.assertTrue(mock_logger_error.called)
1236 def test_check_security_group_rules_not_exists(self):
1237 self.assertEqual(openstack_utils.
1238 check_security_group_rules(self.neutron_client,
1246 def test_check_security_group_rules_exists(self):
1247 self.assertEqual(openstack_utils.
1248 check_security_group_rules(self.neutron_client,
1249 self.sec_group['id'],
1256 @mock.patch('functest.utils.openstack_utils.logger.error')
1257 def test_check_security_group_rules_exception(self, mock_logger_error):
1258 self.assertEqual(openstack_utils.
1259 check_security_group_rules(Exception,
1266 self.assertTrue(mock_logger_error.called)
1268 def test_create_security_group_default(self):
1269 self.assertEqual(openstack_utils.
1270 create_security_group(self.neutron_client,
1275 @mock.patch('functest.utils.openstack_utils.logger.error')
1276 def test_create_security_group_exception(self, mock_logger_error):
1277 self.assertEqual(openstack_utils.
1278 create_security_group(Exception,
1282 self.assertTrue(mock_logger_error.called)
1284 def test_create_secgroup_rule_default(self):
1285 self.assertTrue(openstack_utils.
1286 create_secgroup_rule(self.neutron_client,
1292 self.assertTrue(openstack_utils.
1293 create_secgroup_rule(self.neutron_client,
1298 @mock.patch('functest.utils.openstack_utils.logger.error')
1299 def test_create_secgroup_rule_invalid_port_range(self, mock_logger_error):
1300 self.assertFalse(openstack_utils.
1301 create_secgroup_rule(self.neutron_client,
1307 @mock.patch('functest.utils.openstack_utils.logger.error')
1308 def test_create_secgroup_rule_exception(self, mock_logger_error):
1309 self.assertFalse(openstack_utils.
1310 create_secgroup_rule(Exception,
1315 @mock.patch('functest.utils.openstack_utils.logger.info')
1316 def test_create_security_group_full_default(self, mock_logger_info):
1317 with mock.patch('functest.utils.openstack_utils.'
1318 'get_security_group_id',
1319 return_value='sg_id'):
1320 self.assertEqual(openstack_utils.
1321 create_security_group_full(self.neutron_client,
1325 self.assertTrue(mock_logger_info)
1327 @mock.patch('functest.utils.openstack_utils.logger.info')
1328 @mock.patch('functest.utils.openstack_utils.logger.error')
1329 def test_create_security_group_full_sec_group_fail(self,
1332 with mock.patch('functest.utils.openstack_utils.'
1333 'get_security_group_id',
1335 mock.patch('functest.utils.openstack_utils.'
1336 'create_security_group',
1337 return_value=False):
1338 self.assertEqual(openstack_utils.
1339 create_security_group_full(self.neutron_client,
1343 self.assertTrue(mock_logger_error)
1344 self.assertTrue(mock_logger_info)
1346 @mock.patch('functest.utils.openstack_utils.logger.debug')
1347 @mock.patch('functest.utils.openstack_utils.logger.info')
1348 @mock.patch('functest.utils.openstack_utils.logger.error')
1349 def test_create_security_group_full_secgroup_rule_fail(self,
1353 with mock.patch('functest.utils.openstack_utils.'
1354 'get_security_group_id',
1356 mock.patch('functest.utils.openstack_utils.'
1357 'create_security_group',
1358 return_value={'id': 'sg_id',
1359 'name': 'sg_name'}), \
1360 mock.patch('functest.utils.openstack_utils.'
1361 'create_secgroup_rule',
1362 return_value=False):
1363 self.assertEqual(openstack_utils.
1364 create_security_group_full(self.neutron_client,
1368 self.assertTrue(mock_logger_error)
1369 self.assertTrue(mock_logger_info)
1370 self.assertTrue(mock_logger_debug)
1372 def test_add_secgroup_to_instance_default(self):
1373 self.assertTrue(openstack_utils.
1374 add_secgroup_to_instance(self.nova_client,
1378 @mock.patch('functest.utils.openstack_utils.logger.error')
1379 def test_add_secgroup_to_instance_exception(self, mock_logger_error):
1380 self.assertFalse(openstack_utils.
1381 add_secgroup_to_instance(Exception,
1384 self.assertTrue(mock_logger_error.called)
1386 def test_update_sg_quota_default(self):
1387 self.assertTrue(openstack_utils.
1388 update_sg_quota(self.neutron_client,
1393 @mock.patch('functest.utils.openstack_utils.logger.error')
1394 def test_update_sg_quota_exception(self, mock_logger_error):
1395 self.assertFalse(openstack_utils.
1396 update_sg_quota(Exception,
1400 self.assertTrue(mock_logger_error.called)
1402 def test_delete_security_group_default(self):
1403 self.assertTrue(openstack_utils.
1404 delete_security_group(self.neutron_client,
1407 @mock.patch('functest.utils.openstack_utils.logger.error')
1408 def test_delete_security_group_exception(self, mock_logger_error):
1409 self.assertFalse(openstack_utils.
1410 delete_security_group(Exception,
1412 self.assertTrue(mock_logger_error.called)
1414 def test_get_images_default(self):
1415 self.assertEqual(openstack_utils.
1416 get_images(self.glance_client),
1419 @mock.patch('functest.utils.openstack_utils.logger.error')
1420 def test_get_images_exception(self, mock_logger_error):
1421 self.assertEqual(openstack_utils.
1422 get_images(Exception),
1424 self.assertTrue(mock_logger_error.called)
1426 def test_get_image_id_default(self):
1427 self.assertEqual(openstack_utils.
1428 get_image_id(self.glance_client,
1432 # create_glance_image, get_or_create_image
1433 @mock.patch('functest.utils.openstack_utils.logger.error')
1434 def test_create_glance_image_file_present(self, mock_logger_error):
1435 with mock.patch('functest.utils.openstack_utils.'
1437 return_value=False):
1438 self.assertEqual(openstack_utils.
1439 create_glance_image(self.glance_client,
1443 self.assertTrue(mock_logger_error.called)
1445 @mock.patch('functest.utils.openstack_utils.logger.info')
1446 def test_create_glance_image_already_exist(self, mock_logger_info):
1447 with mock.patch('functest.utils.openstack_utils.'
1449 return_value=True), \
1450 mock.patch('functest.utils.openstack_utils.get_image_id',
1451 return_value='image_id'):
1452 self.assertEqual(openstack_utils.
1453 create_glance_image(self.glance_client,
1457 self.assertTrue(mock_logger_info.called)
1459 @mock.patch('functest.utils.openstack_utils.logger.info')
1460 def test_create_glance_image_default(self, mock_logger_info):
1461 with mock.patch('functest.utils.openstack_utils.'
1463 return_value=True), \
1464 mock.patch('functest.utils.openstack_utils.get_image_id',
1466 mock.patch('six.moves.builtins.open',
1467 mock.mock_open(read_data='1')) as m:
1468 self.assertEqual(openstack_utils.
1469 create_glance_image(self.glance_client,
1473 m.assert_called_once_with('file_path')
1474 self.assertTrue(mock_logger_info.called)
1476 @mock.patch('functest.utils.openstack_utils.logger.error')
1477 def test_create_glance_image_exception(self, mock_logger_error):
1478 with mock.patch('functest.utils.openstack_utils.'
1480 return_value=True), \
1481 mock.patch('functest.utils.openstack_utils.get_image_id',
1482 side_effect=Exception):
1483 self.assertEqual(openstack_utils.
1484 create_glance_image(self.glance_client,
1488 self.assertTrue(mock_logger_error.called)
1490 def test_delete_glance_image_default(self):
1491 self.assertTrue(openstack_utils.
1492 delete_glance_image(self.nova_client,
1495 @mock.patch('functest.utils.openstack_utils.logger.error')
1496 def test_delete_glance_image_exception(self, mock_logger_error):
1497 self.assertFalse(openstack_utils.
1498 delete_glance_image(Exception,
1500 self.assertTrue(mock_logger_error.called)
1502 def test_get_volumes_default(self):
1503 self.assertEqual(openstack_utils.
1504 get_volumes(self.cinder_client),
1507 @mock.patch('functest.utils.openstack_utils.logger.error')
1508 def test_get_volumes_exception(self, mock_logger_error):
1509 self.assertEqual(openstack_utils.
1510 get_volumes(Exception),
1512 self.assertTrue(mock_logger_error.called)
1514 def test_update_cinder_quota_default(self):
1515 self.assertTrue(openstack_utils.
1516 update_cinder_quota(self.cinder_client,
1522 @mock.patch('functest.utils.openstack_utils.logger.error')
1523 def test_update_cinder_quota_exception(self, mock_logger_error):
1524 self.assertFalse(openstack_utils.
1525 update_cinder_quota(Exception,
1530 self.assertTrue(mock_logger_error.called)
1532 def test_delete_volume_default(self):
1533 self.assertTrue(openstack_utils.
1534 delete_volume(self.cinder_client,
1538 self.assertTrue(openstack_utils.
1539 delete_volume(self.cinder_client,
1543 @mock.patch('functest.utils.openstack_utils.logger.error')
1544 def test_delete_volume_exception(self, mock_logger_error):
1545 self.assertFalse(openstack_utils.
1546 delete_volume(Exception,
1549 self.assertTrue(mock_logger_error.called)
1551 def test_get_tenants_default(self):
1552 with mock.patch('functest.utils.openstack_utils.'
1553 'is_keystone_v3', return_value=True):
1554 self.assertEqual(openstack_utils.
1555 get_tenants(self.keystone_client),
1557 with mock.patch('functest.utils.openstack_utils.'
1558 'is_keystone_v3', return_value=False):
1559 self.assertEqual(openstack_utils.
1560 get_tenants(self.keystone_client),
1563 @mock.patch('functest.utils.openstack_utils.logger.error')
1564 def test_get_tenants_exception(self, mock_logger_error):
1565 self.assertEqual(openstack_utils.
1566 get_tenants(Exception),
1568 self.assertTrue(mock_logger_error.called)
1570 def test_get_users_default(self):
1571 self.assertEqual(openstack_utils.
1572 get_users(self.keystone_client),
1575 @mock.patch('functest.utils.openstack_utils.logger.error')
1576 def test_get_users_exception(self, mock_logger_error):
1577 self.assertEqual(openstack_utils.
1578 get_users(Exception),
1580 self.assertTrue(mock_logger_error.called)
1582 def test_get_tenant_id_default(self):
1583 self.assertEqual(openstack_utils.
1584 get_tenant_id(self.keystone_client,
1588 def test_get_user_id_default(self):
1589 self.assertEqual(openstack_utils.
1590 get_user_id(self.keystone_client,
1594 def test_get_role_id_default(self):
1595 self.assertEqual(openstack_utils.
1596 get_role_id(self.keystone_client,
1600 def test_get_domain_id_default(self):
1601 self.assertEqual(openstack_utils.
1602 get_domain_id(self.keystone_client,
1606 def test_create_tenant_default(self):
1607 with mock.patch('functest.utils.openstack_utils.'
1608 'is_keystone_v3', return_value=True):
1609 CONST.__setattr__('OS_PROJECT_DOMAIN_NAME', 'Default')
1610 self.assertEqual(openstack_utils.
1611 create_tenant(self.keystone_client,
1615 with mock.patch('functest.utils.openstack_utils.'
1616 'is_keystone_v3', return_value=False):
1617 self.assertEqual(openstack_utils.
1618 create_tenant(self.keystone_client,
1623 @mock.patch('functest.utils.openstack_utils.logger.error')
1624 def test_create_tenant_exception(self, mock_logger_error):
1625 self.assertEqual(openstack_utils.
1626 create_tenant(Exception,
1630 self.assertTrue(mock_logger_error.called)
1632 def test_create_user_default(self):
1633 with mock.patch('functest.utils.openstack_utils.'
1634 'is_keystone_v3', return_value=True):
1635 self.assertEqual(openstack_utils.
1636 create_user(self.keystone_client,
1642 with mock.patch('functest.utils.openstack_utils.'
1643 'is_keystone_v3', return_value=False):
1644 self.assertEqual(openstack_utils.
1645 create_user(self.keystone_client,
1652 @mock.patch('functest.utils.openstack_utils.logger.error')
1653 def test_create_user_exception(self, mock_logger_error):
1654 self.assertEqual(openstack_utils.
1655 create_user(Exception,
1661 self.assertTrue(mock_logger_error.called)
1663 def test_add_role_user_default(self):
1664 with mock.patch('functest.utils.openstack_utils.'
1665 'is_keystone_v3', return_value=True):
1666 self.assertTrue(openstack_utils.
1667 add_role_user(self.keystone_client,
1672 with mock.patch('functest.utils.openstack_utils.'
1673 'is_keystone_v3', return_value=False):
1674 self.assertTrue(openstack_utils.
1675 add_role_user(self.keystone_client,
1680 @mock.patch('functest.utils.openstack_utils.logger.error')
1681 def test_add_role_user_exception(self, mock_logger_error):
1682 self.assertFalse(openstack_utils.
1683 add_role_user(Exception,
1687 self.assertTrue(mock_logger_error.called)
1689 def test_delete_tenant_default(self):
1690 with mock.patch('functest.utils.openstack_utils.'
1691 'is_keystone_v3', return_value=True):
1692 self.assertTrue(openstack_utils.
1693 delete_tenant(self.keystone_client,
1696 with mock.patch('functest.utils.openstack_utils.'
1697 'is_keystone_v3', return_value=False):
1698 self.assertTrue(openstack_utils.
1699 delete_tenant(self.keystone_client,
1702 @mock.patch('functest.utils.openstack_utils.logger.error')
1703 def test_delete_tenant_exception(self, mock_logger_error):
1704 self.assertFalse(openstack_utils.
1705 delete_tenant(Exception,
1707 self.assertTrue(mock_logger_error.called)
1709 def test_delete_user_default(self):
1710 self.assertTrue(openstack_utils.
1711 delete_user(self.keystone_client,
1714 @mock.patch('functest.utils.openstack_utils.logger.error')
1715 def test_delete_user_exception(self, mock_logger_error):
1716 self.assertFalse(openstack_utils.
1717 delete_user(Exception,
1719 self.assertTrue(mock_logger_error.called)
1721 def test_get_resource_default(self):
1722 with mock.patch('functest.utils.openstack_utils.'
1723 'is_keystone_v3', return_value=True):
1724 self.assertEqual(openstack_utils.
1725 get_resource(self.heat_client,
1730 @mock.patch('functest.utils.openstack_utils.logger.error')
1731 def test_get_resource_exception(self, mock_logger_error):
1732 self.assertEqual(openstack_utils.
1733 get_resource(Exception,
1737 self.assertTrue(mock_logger_error.called)
1739 def test_get_or_create_user_for_vnf_get(self):
1740 with mock.patch('functest.utils.openstack_utils.'
1742 return_value='user_id'), \
1743 mock.patch('functest.utils.openstack_utils.get_tenant_id',
1744 return_value='tenant_id'):
1745 self.assertFalse(openstack_utils.
1746 get_or_create_user_for_vnf(self.keystone_client,
1749 def test_get_or_create_user_for_vnf_create(self):
1750 with mock.patch('functest.utils.openstack_utils.'
1752 return_value=None), \
1753 mock.patch('functest.utils.openstack_utils.get_tenant_id',
1754 return_value='tenant_id'):
1755 self.assertTrue(openstack_utils.
1756 get_or_create_user_for_vnf(self.keystone_client,
1759 def test_get_or_create_user_for_vnf_error_get_user_id(self):
1760 with mock.patch('functest.utils.openstack_utils.'
1762 side_effect=Exception):
1763 self.assertRaises(Exception)
1765 def test_get_or_create_user_for_vnf_error_get_tenant_id(self):
1766 with mock.patch('functest.utils.openstack_utils.'
1768 return_value='user_id'), \
1769 mock.patch('functest.utils.openstack_utils.get_tenant_id',
1770 side_effect='Exception'):
1771 self.assertRaises(Exception)
1773 def test_get_or_create_tenant_for_vnf_get(self):
1774 with mock.patch('functest.utils.openstack_utils.'
1776 return_value='tenant_id'):
1778 openstack_utils.get_or_create_tenant_for_vnf(
1779 self.keystone_client, 'tenant_name', 'tenant_description'))
1781 def test_get_or_create_tenant_for_vnf_create(self):
1782 with mock.patch('functest.utils.openstack_utils.get_tenant_id',
1785 openstack_utils.get_or_create_tenant_for_vnf(
1786 self.keystone_client, 'tenant_name', 'tenant_description'))
1788 def test_get_or_create_tenant_for_vnf_error_get_tenant_id(self):
1789 with mock.patch('functest.utils.openstack_utils.'
1791 side_effect=Exception):
1792 self.assertRaises(Exception)
1794 def test_download_and_add_image_on_glance_image_creation_failure(self):
1795 with mock.patch('functest.utils.openstack_utils.'
1797 mock.patch('functest.utils.openstack_utils.'
1798 'ft_utils.download_url',
1799 return_value=True), \
1800 mock.patch('functest.utils.openstack_utils.'
1801 'create_glance_image',
1803 resp = openstack_utils.download_and_add_image_on_glance(
1808 self.assertEqual(resp, False)
1811 if __name__ == "__main__":
1812 logging.disable(logging.CRITICAL)
1813 unittest.main(verbosity=2)