3 # All rights reserved. This program and the accompanying materials
4 # are made available under the terms of the Apache License, Version 2.0
5 # which accompanies this distribution, and is available at
6 # http://www.apache.org/licenses/LICENSE-2.0
15 from functest.utils import openstack_utils
18 class OSUtilsTesting(unittest.TestCase):
20 def _get_env_cred_dict(self, os_prefix=''):
21 return {'OS_USERNAME': os_prefix + 'username',
22 'OS_PASSWORD': os_prefix + 'password',
23 'OS_AUTH_URL': os_prefix + 'auth_url',
24 'OS_TENANT_NAME': os_prefix + 'tenant_name',
25 'OS_USER_DOMAIN_NAME': os_prefix + 'user_domain_name',
26 'OS_PROJECT_DOMAIN_NAME': os_prefix + 'project_domain_name',
27 'OS_PROJECT_NAME': os_prefix + 'project_name',
28 'OS_ENDPOINT_TYPE': os_prefix + 'endpoint_type',
29 'OS_REGION_NAME': os_prefix + 'region_name',
30 'OS_CACERT': os_prefix + 'https_cacert',
31 'OS_INSECURE': os_prefix + 'https_insecure'}
33 def _get_os_env_vars(self):
34 return {'username': 'test_username', 'password': 'test_password',
35 'auth_url': 'test_auth_url', 'tenant_name': 'test_tenant_name',
36 'user_domain_name': 'test_user_domain_name',
37 'project_domain_name': 'test_project_domain_name',
38 'project_name': 'test_project_name',
39 'endpoint_type': 'test_endpoint_type',
40 'region_name': 'test_region_name',
41 'https_cacert': 'test_https_cacert',
42 'https_insecure': 'test_https_insecure'}
45 self.env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD']
46 self.tenant_name = 'test_tenant_name'
47 self.env_cred_dict = self._get_env_cred_dict()
48 self.os_environs = self._get_env_cred_dict(os_prefix='test_')
49 self.os_env_vars = self._get_os_env_vars()
51 mock_obj = mock.Mock()
52 attrs = {'name': 'test_flavor',
55 mock_obj.configure_mock(**attrs)
56 self.flavor = mock_obj
58 mock_obj = mock.Mock()
59 attrs = {'name': 'test_aggregate',
61 'hosts': ['host_name']}
62 mock_obj.configure_mock(**attrs)
63 self.aggregate = mock_obj
65 mock_obj = mock.Mock()
66 attrs = {'id': 'instance_id',
67 'name': 'test_instance',
69 mock_obj.configure_mock(**attrs)
70 self.instance = mock_obj
72 mock_obj = mock.Mock()
73 attrs = {'id': 'azone_id',
74 'zoneName': 'test_azone',
76 mock_obj.configure_mock(**attrs)
77 self.availability_zone = mock_obj
79 mock_obj = mock.Mock()
80 attrs = {'floating_network_id': 'floating_id',
81 'floating_ip_address': 'test_floating_ip'}
82 mock_obj.configure_mock(**attrs)
83 self.floating_ip = mock_obj
85 mock_obj = mock.Mock()
86 attrs = {'id': 'hypervisor_id',
87 'hypervisor_hostname': 'test_hostname',
89 mock_obj.configure_mock(**attrs)
90 self.hypervisor = mock_obj
92 mock_obj = mock.Mock()
93 attrs = {'id': 'image_id',
95 mock_obj.configure_mock(**attrs)
98 mock_obj = mock.Mock()
99 self.mock_return = mock_obj
101 self.nova_client = mock.Mock()
102 attrs = {'servers.list.return_value': [self.instance],
103 'servers.get.return_value': self.instance,
104 'servers.find.return_value': self.instance,
105 'servers.create.return_value': self.instance,
106 'flavors.list.return_value': [self.flavor],
107 'flavors.find.return_value': self.flavor,
108 'servers.add_floating_ip.return_value': mock.Mock(),
109 'servers.force_delete.return_value': mock.Mock(),
110 'aggregates.list.return_value': [self.aggregate],
111 'aggregates.add_host.return_value': mock.Mock(),
112 'aggregates.remove_host.return_value': mock.Mock(),
113 'aggregates.get.return_value': self.aggregate,
114 'aggregates.delete.return_value': mock.Mock(),
115 'availability_zones.list.return_value':
116 [self.availability_zone],
117 'hypervisors.list.return_value': [self.hypervisor],
118 'create.return_value': mock.Mock(),
119 'add_security_group.return_value': mock.Mock(),
120 'images.list.return_value': [self.image],
121 'images.delete.return_value': mock.Mock(),
123 self.nova_client.configure_mock(**attrs)
125 self.glance_client = mock.Mock()
126 attrs = {'images.list.return_value': [self.image],
127 'images.create.return_value': self.image,
128 'images.upload.return_value': mock.Mock()}
129 self.glance_client.configure_mock(**attrs)
131 mock_obj = mock.Mock()
132 attrs = {'id': 'volume_id',
133 'name': 'test_volume'}
134 mock_obj.configure_mock(**attrs)
135 self.volume = mock_obj
137 mock_obj = mock.Mock()
138 attrs = {'id': 'volume_type_id',
139 'name': 'test_volume_type',
141 mock_obj.configure_mock(**attrs)
142 self.volume_types = [mock_obj]
144 mock_obj = mock.Mock()
145 attrs = {'id': 'volume_type_id',
146 'name': 'test_volume_type',
148 mock_obj.configure_mock(**attrs)
149 self.volume_types.append(mock_obj)
151 self.cinder_client = mock.Mock()
152 attrs = {'volumes.list.return_value': [self.volume],
153 'volume_types.list.return_value': self.volume_types,
154 'volume_types.create.return_value': self.volume_types[0],
155 'volume_types.delete.return_value': mock.Mock(),
156 'quotas.update.return_value': mock.Mock(),
157 'volumes.detach.return_value': mock.Mock(),
158 'volumes.force_delete.return_value': mock.Mock(),
159 'volumes.delete.return_value': mock.Mock()
161 self.cinder_client.configure_mock(**attrs)
163 self.resource = mock.Mock()
164 attrs = {'id': 'resource_test_id',
165 'name': 'resource_test_name'
168 self.heat_client = mock.Mock()
169 attrs = {'resources.get.return_value': self.resource}
170 self.heat_client.configure_mock(**attrs)
172 mock_obj = mock.Mock()
173 attrs = {'id': 'tenant_id',
174 'name': 'test_tenant'}
175 mock_obj.configure_mock(**attrs)
176 self.tenant = mock_obj
178 mock_obj = mock.Mock()
179 attrs = {'id': 'user_id',
181 mock_obj.configure_mock(**attrs)
184 mock_obj = mock.Mock()
185 attrs = {'id': 'role_id',
187 mock_obj.configure_mock(**attrs)
190 self.keystone_client = mock.Mock()
191 attrs = {'projects.list.return_value': [self.tenant],
192 'tenants.list.return_value': [self.tenant],
193 'users.list.return_value': [self.user],
194 'roles.list.return_value': [self.role],
195 'projects.create.return_value': self.tenant,
196 'tenants.create.return_value': self.tenant,
197 'users.create.return_value': self.user,
198 'roles.grant.return_value': mock.Mock(),
199 'roles.add_user_role.return_value': mock.Mock(),
200 'projects.delete.return_value': mock.Mock(),
201 'tenants.delete.return_value': mock.Mock(),
202 'users.delete.return_value': mock.Mock(),
204 self.keystone_client.configure_mock(**attrs)
206 self.router = {'id': 'router_id',
207 'name': 'test_router'}
209 self.subnet = {'id': 'subnet_id',
210 'name': 'test_subnet'}
212 self.networks = [{'id': 'network_id',
213 'name': 'test_network',
214 'router:external': False,
216 'subnets': [self.subnet]},
217 {'id': 'network_id1',
218 'name': 'test_network1',
219 'router:external': True,
221 'subnets': [self.subnet]}]
223 self.port = {'id': 'port_id',
226 self.sec_group = {'id': 'sec_group_id',
227 'name': 'test_sec_group'}
229 self.sec_group_rule = {'id': 'sec_group_rule_id',
230 'direction': 'direction',
231 'protocol': 'protocol',
232 'port_range_max': 'port_max',
233 'security_group_id': self.sec_group['id'],
234 'port_range_min': 'port_min'}
235 self.neutron_floatingip = {'id': 'fip_id',
236 'floating_ip_address': 'test_ip'}
237 self.neutron_client = mock.Mock()
238 attrs = {'list_networks.return_value': {'networks': self.networks},
239 'list_routers.return_value': {'routers': [self.router]},
240 'list_ports.return_value': {'ports': [self.port]},
241 'list_subnets.return_value': {'subnets': [self.subnet]},
242 'create_network.return_value': {'network': self.networks[0]},
243 'create_subnet.return_value': {'subnets': [self.subnet]},
244 'create_router.return_value': {'router': self.router},
245 'create_port.return_value': {'port': self.port},
246 'create_floatingip.return_value': {'floatingip':
247 self.neutron_floatingip},
248 'update_network.return_value': mock.Mock(),
249 'update_port.return_value': {'port': self.port},
250 'add_interface_router.return_value': mock.Mock(),
251 'add_gateway_router.return_value': mock.Mock(),
252 'delete_network.return_value': mock.Mock(),
253 'delete_subnet.return_value': mock.Mock(),
254 'delete_router.return_value': mock.Mock(),
255 'delete_port.return_value': mock.Mock(),
256 'remove_interface_router.return_value': mock.Mock(),
257 'remove_gateway_router.return_value': mock.Mock(),
258 'list_security_groups.return_value': {'security_groups':
260 'list_security_group_rules.'
261 'return_value': {'security_group_rules':
262 [self.sec_group_rule]},
263 'create_security_group_rule.return_value': mock.Mock(),
264 'create_security_group.return_value': {'security_group':
266 'update_quota.return_value': mock.Mock(),
267 'delete_security_group.return_value': mock.Mock(),
268 'list_floatingips.return_value': {'floatingips':
270 'delete_floatingip.return_value': mock.Mock(),
272 self.neutron_client.configure_mock(**attrs)
274 self.empty_client = mock.Mock()
275 attrs = {'list_networks.return_value': {'networks': []},
276 'list_routers.return_value': {'routers': []},
277 'list_ports.return_value': {'ports': []},
278 'list_subnets.return_value': {'subnets': []}}
279 self.empty_client.configure_mock(**attrs)
281 @mock.patch('functest.utils.openstack_utils.os.getenv',
283 def test_is_keystone_v3_missing_identity(self, mock_os_getenv):
284 self.assertEqual(openstack_utils.is_keystone_v3(), False)
286 @mock.patch('functest.utils.openstack_utils.os.getenv',
288 def test_is_keystone_v3_default(self, mock_os_getenv):
289 self.assertEqual(openstack_utils.is_keystone_v3(), True)
291 @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
293 def test_get_rc_env_vars_missing_identity(self, mock_get_rc_env):
294 exp_resp = self.env_vars
295 exp_resp.extend(['OS_TENANT_NAME'])
296 self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
298 @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
300 def test_get_rc_env_vars_default(self, mock_get_rc_env):
301 exp_resp = self.env_vars
302 exp_resp.extend(['OS_PROJECT_NAME',
303 'OS_USER_DOMAIN_NAME',
304 'OS_PROJECT_DOMAIN_NAME'])
305 self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
307 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
308 def test_check_credentials_missing_env(self, mock_get_rc_env):
309 exp_resp = self.env_vars
310 exp_resp.extend(['OS_TENANT_NAME'])
311 mock_get_rc_env.return_value = exp_resp
312 with mock.patch.dict('functest.utils.openstack_utils.os.environ', {},
314 self.assertEqual(openstack_utils.check_credentials(), False)
316 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
317 def test_check_credentials_default(self, mock_get_rc_env):
318 exp_resp = ['OS_TENANT_NAME']
319 mock_get_rc_env.return_value = exp_resp
320 with mock.patch.dict('functest.utils.openstack_utils.os.environ',
321 {'OS_TENANT_NAME': self.tenant_name},
323 self.assertEqual(openstack_utils.check_credentials(), True)
325 def test_get_env_cred_dict(self):
326 self.assertDictEqual(openstack_utils.get_env_cred_dict(),
329 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
330 def test_get_credentials_default(self, mock_get_rc_env):
331 mock_get_rc_env.return_value = self.env_cred_dict.keys()
332 with mock.patch.dict('functest.utils.openstack_utils.os.environ',
335 self.assertDictEqual(openstack_utils.get_credentials(),
338 def _get_credentials_missing_env(self, var):
339 dic = copy.deepcopy(self.os_environs)
341 with mock.patch('functest.utils.openstack_utils.get_rc_env_vars',
342 return_value=self.env_cred_dict.keys()), \
343 mock.patch.dict('functest.utils.openstack_utils.os.environ',
346 self.assertRaises(openstack_utils.MissingEnvVar,
347 lambda: openstack_utils.get_credentials())
349 def test_get_credentials_missing_username(self):
350 self._get_credentials_missing_env('OS_USERNAME')
352 def test_get_credentials_missing_password(self):
353 self._get_credentials_missing_env('OS_PASSWORD')
355 def test_get_credentials_missing_auth_url(self):
356 self._get_credentials_missing_env('OS_AUTH_URL')
358 def test_get_credentials_missing_tenantname(self):
359 self._get_credentials_missing_env('OS_TENANT_NAME')
361 def test_get_credentials_missing_domainname(self):
362 self._get_credentials_missing_env('OS_USER_DOMAIN_NAME')
364 def test_get_credentials_missing_projectname(self):
365 self._get_credentials_missing_env('OS_PROJECT_NAME')
367 def test_get_credentials_missing_endpoint_type(self):
368 self._get_credentials_missing_env('OS_ENDPOINT_TYPE')
370 def _test_source_credentials(self, msg, key='OS_TENANT_NAME',
377 with mock.patch('__builtin__.open', mock.mock_open(read_data=msg),
379 m.return_value.__iter__ = lambda self: iter(self.readline, '')
380 openstack_utils.source_credentials(f)
381 m.assert_called_once_with(f, 'r')
382 self.assertEqual(os.environ[key], value)
384 def test_source_credentials(self):
385 self._test_source_credentials('OS_TENANT_NAME=admin')
386 self._test_source_credentials('OS_TENANT_NAME= admin')
387 self._test_source_credentials('OS_TENANT_NAME = admin')
388 self._test_source_credentials('OS_TENANT_NAME = "admin"')
389 self._test_source_credentials('export OS_TENANT_NAME=admin')
390 self._test_source_credentials('export OS_TENANT_NAME =admin')
391 self._test_source_credentials('export OS_TENANT_NAME = admin')
392 self._test_source_credentials('export OS_TENANT_NAME = "admin"')
393 self._test_source_credentials('OS_TENANT_NAME', value='')
394 self._test_source_credentials('export OS_TENANT_NAME', value='')
395 # This test will fail as soon as rc_file is fixed
396 self._test_source_credentials(
397 'export "\'OS_TENANT_NAME\'" = "\'admin\'"')
399 @mock.patch('functest.utils.openstack_utils.os.getenv',
401 def test_get_keystone_client_version_missing_env(self, mock_os_getenv):
402 self.assertEqual(openstack_utils.get_keystone_client_version(),
403 openstack_utils.DEFAULT_API_VERSION)
405 @mock.patch('functest.utils.openstack_utils.logger.info')
406 @mock.patch('functest.utils.openstack_utils.os.getenv',
408 def test_get_keystone_client_version_default(self, mock_os_getenv,
410 self.assertEqual(openstack_utils.get_keystone_client_version(),
412 mock_logger_info.assert_called_once_with("OS_IDENTITY_API_VERSION is "
413 "set in env as '%s'", '3')
415 @mock.patch('functest.utils.openstack_utils.get_session')
416 @mock.patch('functest.utils.openstack_utils.keystoneclient.Client')
417 @mock.patch('functest.utils.openstack_utils.get_keystone_client_version',
419 @mock.patch('functest.utils.openstack_utils.os.getenv',
420 return_value='public')
421 def test_get_keystone_client_with_interface(self, mock_os_getenv,
422 mock_keystoneclient_version,
425 mock_keystone_obj = mock.Mock()
426 mock_session_obj = mock.Mock()
427 mock_key_client.return_value = mock_keystone_obj
428 mock_get_session.return_value = mock_session_obj
429 self.assertEqual(openstack_utils.get_keystone_client(),
431 mock_key_client.assert_called_once_with('3',
432 session=mock_session_obj,
435 @mock.patch('functest.utils.openstack_utils.get_session')
436 @mock.patch('functest.utils.openstack_utils.keystoneclient.Client')
437 @mock.patch('functest.utils.openstack_utils.get_keystone_client_version',
439 @mock.patch('functest.utils.openstack_utils.os.getenv',
440 return_value='admin')
441 def test_get_keystone_client_no_interface(self, mock_os_getenv,
442 mock_keystoneclient_version,
445 mock_keystone_obj = mock.Mock()
446 mock_session_obj = mock.Mock()
447 mock_key_client.return_value = mock_keystone_obj
448 mock_get_session.return_value = mock_session_obj
449 self.assertEqual(openstack_utils.get_keystone_client(),
451 mock_key_client.assert_called_once_with('3',
452 session=mock_session_obj,
455 @mock.patch('functest.utils.openstack_utils.os.getenv',
457 def test_get_nova_client_version_missing_env(self, mock_os_getenv):
458 self.assertEqual(openstack_utils.get_nova_client_version(),
459 openstack_utils.DEFAULT_API_VERSION)
461 @mock.patch('functest.utils.openstack_utils.logger.info')
462 @mock.patch('functest.utils.openstack_utils.os.getenv',
464 def test_get_nova_client_version_default(self, mock_os_getenv,
466 self.assertEqual(openstack_utils.get_nova_client_version(),
468 mock_logger_info.assert_called_once_with("OS_COMPUTE_API_VERSION is "
469 "set in env as '%s'", '3')
471 def test_get_nova_client(self):
472 mock_nova_obj = mock.Mock()
473 mock_session_obj = mock.Mock()
474 with mock.patch('functest.utils.openstack_utils'
475 '.get_nova_client_version', return_value='3'), \
476 mock.patch('functest.utils.openstack_utils'
477 '.novaclient.Client',
478 return_value=mock_nova_obj) \
479 as mock_nova_client, \
480 mock.patch('functest.utils.openstack_utils.get_session',
481 return_value=mock_session_obj):
482 self.assertEqual(openstack_utils.get_nova_client(),
484 mock_nova_client.assert_called_once_with('3',
485 session=mock_session_obj)
487 @mock.patch('functest.utils.openstack_utils.os.getenv',
489 def test_get_cinder_client_version_missing_env(self, mock_os_getenv):
490 self.assertEqual(openstack_utils.get_cinder_client_version(),
491 openstack_utils.DEFAULT_API_VERSION)
493 @mock.patch('functest.utils.openstack_utils.logger.info')
494 @mock.patch('functest.utils.openstack_utils.os.getenv',
496 def test_get_cinder_client_version_default(self, mock_os_getenv,
498 self.assertEqual(openstack_utils.get_cinder_client_version(),
500 mock_logger_info.assert_called_once_with("OS_VOLUME_API_VERSION is "
501 "set in env as '%s'", '3')
503 def test_get_cinder_client(self):
504 mock_cinder_obj = mock.Mock()
505 mock_session_obj = mock.Mock()
506 with mock.patch('functest.utils.openstack_utils'
507 '.get_cinder_client_version', return_value='3'), \
508 mock.patch('functest.utils.openstack_utils'
509 '.cinderclient.Client',
510 return_value=mock_cinder_obj) \
511 as mock_cind_client, \
512 mock.patch('functest.utils.openstack_utils.get_session',
513 return_value=mock_session_obj):
514 self.assertEqual(openstack_utils.get_cinder_client(),
516 mock_cind_client.assert_called_once_with('3',
517 session=mock_session_obj)
519 @mock.patch('functest.utils.openstack_utils.os.getenv',
521 def test_get_neutron_client_version_missing_env(self, mock_os_getenv):
522 self.assertEqual(openstack_utils.get_neutron_client_version(),
523 openstack_utils.DEFAULT_API_VERSION)
525 @mock.patch('functest.utils.openstack_utils.logger.info')
526 @mock.patch('functest.utils.openstack_utils.os.getenv',
528 def test_get_neutron_client_version_default(self, mock_os_getenv,
530 self.assertEqual(openstack_utils.get_neutron_client_version(),
532 mock_logger_info.assert_called_once_with("OS_NETWORK_API_VERSION is "
533 "set in env as '%s'", '3')
535 def test_get_neutron_client(self):
536 mock_neutron_obj = mock.Mock()
537 mock_session_obj = mock.Mock()
538 with mock.patch('functest.utils.openstack_utils'
539 '.get_neutron_client_version', return_value='3'), \
540 mock.patch('functest.utils.openstack_utils'
541 '.neutronclient.Client',
542 return_value=mock_neutron_obj) \
543 as mock_neut_client, \
544 mock.patch('functest.utils.openstack_utils.get_session',
545 return_value=mock_session_obj):
546 self.assertEqual(openstack_utils.get_neutron_client(),
548 mock_neut_client.assert_called_once_with('3',
549 session=mock_session_obj)
551 @mock.patch('functest.utils.openstack_utils.os.getenv',
553 def test_get_glance_client_version_missing_env(self, mock_os_getenv):
554 self.assertEqual(openstack_utils.get_glance_client_version(),
555 openstack_utils.DEFAULT_API_VERSION)
557 @mock.patch('functest.utils.openstack_utils.logger.info')
558 @mock.patch('functest.utils.openstack_utils.os.getenv',
560 def test_get_glance_client_version_default(self, mock_os_getenv,
562 self.assertEqual(openstack_utils.get_glance_client_version(),
564 mock_logger_info.assert_called_once_with("OS_IMAGE_API_VERSION is "
565 "set in env as '%s'", '3')
567 def test_get_glance_client(self):
568 mock_glance_obj = mock.Mock()
569 mock_session_obj = mock.Mock()
570 with mock.patch('functest.utils.openstack_utils'
571 '.get_glance_client_version', return_value='3'), \
572 mock.patch('functest.utils.openstack_utils'
573 '.glanceclient.Client',
574 return_value=mock_glance_obj) \
575 as mock_glan_client, \
576 mock.patch('functest.utils.openstack_utils.get_session',
577 return_value=mock_session_obj):
578 self.assertEqual(openstack_utils.get_glance_client(),
580 mock_glan_client.assert_called_once_with('3',
581 session=mock_session_obj)
583 @mock.patch('functest.utils.openstack_utils.os.getenv',
585 def test_get_heat_client_version_missing_env(self, mock_os_getenv):
586 self.assertEqual(openstack_utils.get_heat_client_version(),
587 openstack_utils.DEFAULT_HEAT_API_VERSION)
589 @mock.patch('functest.utils.openstack_utils.logger.info')
590 @mock.patch('functest.utils.openstack_utils.os.getenv', return_value='1')
591 def test_get_heat_client_version_default(self, mock_os_getenv,
593 self.assertEqual(openstack_utils.get_heat_client_version(), '1')
594 mock_logger_info.assert_called_once_with(
595 "OS_ORCHESTRATION_API_VERSION is set in env as '%s'", '1')
597 def test_get_heat_client(self):
598 mock_heat_obj = mock.Mock()
599 mock_session_obj = mock.Mock()
600 with mock.patch('functest.utils.openstack_utils'
601 '.get_heat_client_version', return_value='1'), \
602 mock.patch('functest.utils.openstack_utils'
603 '.heatclient.Client',
604 return_value=mock_heat_obj) \
605 as mock_heat_client, \
606 mock.patch('functest.utils.openstack_utils.get_session',
607 return_value=mock_session_obj):
608 self.assertEqual(openstack_utils.get_heat_client(),
610 mock_heat_client.assert_called_once_with('1',
611 session=mock_session_obj)
613 def test_get_instances_default(self):
614 self.assertEqual(openstack_utils.get_instances(self.nova_client),
617 @mock.patch('functest.utils.openstack_utils.logger.error')
618 def test_get_instances_exception(self, mock_logger_error):
619 self.assertEqual(openstack_utils.
620 get_instances(Exception),
622 self.assertTrue(mock_logger_error.called)
624 def test_get_instance_status_default(self):
625 self.assertEqual(openstack_utils.get_instance_status(self.nova_client,
629 @mock.patch('functest.utils.openstack_utils.logger.error')
630 def test_get_instance_status_exception(self, mock_logger_error):
631 self.assertEqual(openstack_utils.
632 get_instance_status(Exception,
635 self.assertTrue(mock_logger_error.called)
637 def test_get_instance_by_name_default(self):
638 self.assertEqual(openstack_utils.
639 get_instance_by_name(self.nova_client,
643 @mock.patch('functest.utils.openstack_utils.logger.error')
644 def test_get_instance_by_name_exception(self, mock_logger_error):
645 self.assertEqual(openstack_utils.
646 get_instance_by_name(Exception,
649 self.assertTrue(mock_logger_error.called)
651 def test_get_flavor_id_default(self):
652 self.assertEqual(openstack_utils.
653 get_flavor_id(self.nova_client,
657 def test_get_flavor_id_by_ram_range_default(self):
658 self.assertEqual(openstack_utils.
659 get_flavor_id_by_ram_range(self.nova_client,
663 def test_get_aggregates_default(self):
664 self.assertEqual(openstack_utils.
665 get_aggregates(self.nova_client),
668 @mock.patch('functest.utils.openstack_utils.logger.error')
669 def test_get_aggregates_exception(self, mock_logger_error):
670 self.assertEqual(openstack_utils.
671 get_aggregates(Exception),
673 self.assertTrue(mock_logger_error.called)
675 def test_get_aggregate_id_default(self):
676 with mock.patch('functest.utils.openstack_utils.get_aggregates',
677 return_value=[self.aggregate]):
678 self.assertEqual(openstack_utils.
679 get_aggregate_id(self.nova_client,
683 @mock.patch('functest.utils.openstack_utils.logger.error')
684 def test_get_aggregate_id_exception(self, mock_logger_error):
685 self.assertEqual(openstack_utils.
686 get_aggregate_id(Exception,
689 self.assertTrue(mock_logger_error.called)
691 def test_get_availability_zone_names_default(self):
692 with mock.patch('functest.utils.openstack_utils'
693 '.get_availability_zones',
694 return_value=[self.availability_zone]):
695 self.assertEqual(openstack_utils.
696 get_availability_zone_names(self.nova_client),
699 @mock.patch('functest.utils.openstack_utils.logger.error')
700 def test_get_availability_zone_names_exception(self, mock_logger_error):
701 self.assertEqual(openstack_utils.
702 get_availability_zone_names(Exception),
704 self.assertTrue(mock_logger_error.called)
706 def test_get_availability_zones_default(self):
707 self.assertEqual(openstack_utils.
708 get_availability_zones(self.nova_client),
709 [self.availability_zone])
711 @mock.patch('functest.utils.openstack_utils.logger.error')
712 def test_get_availability_zones_exception(self, mock_logger_error):
713 self.assertEqual(openstack_utils.
714 get_availability_zones(Exception),
716 self.assertTrue(mock_logger_error.called)
718 def test_get_floating_ips_default(self):
719 self.assertEqual(openstack_utils.
720 get_floating_ips(self.neutron_client),
723 @mock.patch('functest.utils.openstack_utils.logger.error')
724 def test_get_floating_ips_exception(self, mock_logger_error):
725 self.assertEqual(openstack_utils.
726 get_floating_ips(Exception),
728 self.assertTrue(mock_logger_error.called)
730 def test_get_hypervisors_default(self):
731 self.assertEqual(openstack_utils.
732 get_hypervisors(self.nova_client),
735 @mock.patch('functest.utils.openstack_utils.logger.error')
736 def test_get_hypervisors_exception(self, mock_logger_error):
737 self.assertEqual(openstack_utils.
738 get_hypervisors(Exception),
740 self.assertTrue(mock_logger_error.called)
742 def test_create_aggregate_default(self):
743 self.assertTrue(openstack_utils.
744 create_aggregate(self.nova_client,
748 @mock.patch('functest.utils.openstack_utils.logger.error')
749 def test_create_aggregate_exception(self, mock_logger_error):
750 self.assertEqual(openstack_utils.
751 create_aggregate(Exception,
755 self.assertTrue(mock_logger_error.called)
757 def test_add_host_to_aggregate_default(self):
758 with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
759 self.assertTrue(openstack_utils.
760 add_host_to_aggregate(self.nova_client,
764 @mock.patch('functest.utils.openstack_utils.logger.error')
765 def test_add_host_to_aggregate_exception(self, mock_logger_error):
766 self.assertEqual(openstack_utils.
767 add_host_to_aggregate(Exception,
771 self.assertTrue(mock_logger_error.called)
773 def test_create_aggregate_with_host_default(self):
774 with mock.patch('functest.utils.openstack_utils.create_aggregate'), \
775 mock.patch('functest.utils.openstack_utils.'
776 'add_host_to_aggregate'):
777 self.assertTrue(openstack_utils.
778 create_aggregate_with_host(self.nova_client,
783 @mock.patch('functest.utils.openstack_utils.logger.error')
784 def test_create_aggregate_with_host_exception(self, mock_logger_error):
785 with mock.patch('functest.utils.openstack_utils.create_aggregate',
786 side_effect=Exception):
787 self.assertEqual(openstack_utils.
788 create_aggregate_with_host(Exception,
793 self.assertTrue(mock_logger_error.called)
795 def test_create_instance_default(self):
796 with mock.patch('functest.utils.openstack_utils.'
798 return_value=self.nova_client):
799 self.assertEqual(openstack_utils.
800 create_instance('test_flavor',
805 @mock.patch('functest.utils.openstack_utils.logger.error')
806 def test_create_instance_exception(self, mock_logger_error):
807 with mock.patch('functest.utils.openstack_utils.'
809 return_value=self.nova_client):
810 self.nova_client.flavors.find.side_effect = Exception
811 self.assertEqual(openstack_utils.
812 create_instance('test_flavor',
816 self.assertTrue(mock_logger_error)
818 def test_create_floating_ip_default(self):
819 with mock.patch('functest.utils.openstack_utils.'
820 'get_external_net_id',
821 return_value='external_net_id'):
822 exp_resp = {'fip_addr': 'test_ip', 'fip_id': 'fip_id'}
823 self.assertEqual(openstack_utils.
824 create_floating_ip(self.neutron_client),
827 @mock.patch('functest.utils.openstack_utils.logger.error')
828 def test_create_floating_ip_exception(self, mock_logger_error):
829 with mock.patch('functest.utils.openstack_utils.'
830 'get_external_net_id',
831 return_value='external_net_id'):
832 self.assertEqual(openstack_utils.
833 create_floating_ip(Exception),
835 self.assertTrue(mock_logger_error)
837 def test_add_floating_ip_default(self):
838 with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
839 self.assertTrue(openstack_utils.
840 add_floating_ip(self.nova_client,
842 'test_floatingip_addr'))
844 @mock.patch('functest.utils.openstack_utils.logger.error')
845 def test_add_floating_ip_exception(self, mock_logger_error):
846 self.assertFalse(openstack_utils.
847 add_floating_ip(Exception,
849 'test_floatingip_addr'))
850 self.assertTrue(mock_logger_error.called)
852 def test_delete_instance_default(self):
853 self.assertTrue(openstack_utils.
854 delete_instance(self.nova_client,
857 @mock.patch('functest.utils.openstack_utils.logger.error')
858 def test_delete_instance_exception(self, mock_logger_error):
859 self.assertFalse(openstack_utils.
860 delete_instance(Exception,
862 self.assertTrue(mock_logger_error.called)
864 def test_delete_floating_ip_default(self):
865 self.assertTrue(openstack_utils.
866 delete_floating_ip(self.neutron_client,
869 @mock.patch('functest.utils.openstack_utils.logger.error')
870 def test_delete_floating_ip_exception(self, mock_logger_error):
871 self.assertFalse(openstack_utils.
872 delete_floating_ip(Exception,
874 self.assertTrue(mock_logger_error.called)
876 def test_remove_host_from_aggregate_default(self):
877 with mock.patch('functest.utils.openstack_utils.'
879 self.assertTrue(openstack_utils.
880 remove_host_from_aggregate(self.nova_client,
884 @mock.patch('functest.utils.openstack_utils.logger.error')
885 def test_remove_host_from_aggregate_exception(self, mock_logger_error):
886 with mock.patch('functest.utils.openstack_utils.'
887 'get_aggregate_id', side_effect=Exception):
888 self.assertFalse(openstack_utils.
889 remove_host_from_aggregate(self.nova_client,
892 self.assertTrue(mock_logger_error.called)
894 def test_remove_hosts_from_aggregate_default(self):
895 with mock.patch('functest.utils.openstack_utils.'
896 'get_aggregate_id'), \
897 mock.patch('functest.utils.openstack_utils.'
898 'remove_host_from_aggregate',
901 openstack_utils.remove_hosts_from_aggregate(self.nova_client,
903 mock_method.assert_any_call(self.nova_client,
907 def test_delete_aggregate_default(self):
908 with mock.patch('functest.utils.openstack_utils.'
909 'remove_hosts_from_aggregate'):
910 self.assertTrue(openstack_utils.
911 delete_aggregate(self.nova_client,
914 @mock.patch('functest.utils.openstack_utils.logger.error')
915 def test_delete_aggregate_exception(self, mock_logger_error):
916 with mock.patch('functest.utils.openstack_utils.'
917 'remove_hosts_from_aggregate', side_effect=Exception):
918 self.assertFalse(openstack_utils.
919 delete_aggregate(self.nova_client,
921 self.assertTrue(mock_logger_error.called)
923 def test_get_network_list_default(self):
924 self.assertEqual(openstack_utils.
925 get_network_list(self.neutron_client),
928 def test_get_network_list_missing_network(self):
929 self.assertEqual(openstack_utils.
930 get_network_list(self.empty_client),
933 def test_get_router_list_default(self):
934 self.assertEqual(openstack_utils.
935 get_router_list(self.neutron_client),
938 def test_get_router_list_missing_router(self):
939 self.assertEqual(openstack_utils.
940 get_router_list(self.empty_client),
943 def test_get_port_list_default(self):
944 self.assertEqual(openstack_utils.
945 get_port_list(self.neutron_client),
948 def test_get_port_list_missing_port(self):
949 self.assertEqual(openstack_utils.
950 get_port_list(self.empty_client),
953 def test_get_network_id_default(self):
954 self.assertEqual(openstack_utils.
955 get_network_id(self.neutron_client,
959 def test_get_subnet_id_default(self):
960 self.assertEqual(openstack_utils.
961 get_subnet_id(self.neutron_client,
965 def test_get_router_id_default(self):
966 self.assertEqual(openstack_utils.
967 get_router_id(self.neutron_client,
971 def test_get_private_net_default(self):
972 self.assertEqual(openstack_utils.
973 get_private_net(self.neutron_client),
976 def test_get_private_net_missing_net(self):
977 self.assertEqual(openstack_utils.
978 get_private_net(self.empty_client),
981 def test_get_external_net_default(self):
982 self.assertEqual(openstack_utils.
983 get_external_net(self.neutron_client),
986 def test_get_external_net_missing_net(self):
987 self.assertEqual(openstack_utils.
988 get_external_net(self.empty_client),
991 def test_get_external_net_id_default(self):
992 self.assertEqual(openstack_utils.
993 get_external_net_id(self.neutron_client),
996 def test_get_external_net_id_missing_net(self):
997 self.assertEqual(openstack_utils.
998 get_external_net_id(self.empty_client),
1001 def test_check_neutron_net_default(self):
1002 self.assertTrue(openstack_utils.
1003 check_neutron_net(self.neutron_client,
1006 def test_check_neutron_net_missing_net(self):
1007 self.assertFalse(openstack_utils.
1008 check_neutron_net(self.empty_client,
1011 def test_create_neutron_net_default(self):
1012 self.assertEqual(openstack_utils.
1013 create_neutron_net(self.neutron_client,
1017 @mock.patch('functest.utils.openstack_utils.logger.error')
1018 def test_create_neutron_net_exception(self, mock_logger_error):
1019 self.assertEqual(openstack_utils.
1020 create_neutron_net(Exception,
1023 self.assertTrue(mock_logger_error.called)
1025 def test_create_neutron_subnet_default(self):
1026 self.assertEqual(openstack_utils.
1027 create_neutron_subnet(self.neutron_client,
1033 @mock.patch('functest.utils.openstack_utils.logger.error')
1034 def test_create_neutron_subnet_exception(self, mock_logger_error):
1035 self.assertEqual(openstack_utils.
1036 create_neutron_subnet(Exception,
1041 self.assertTrue(mock_logger_error.called)
1043 def test_create_neutron_router_default(self):
1044 self.assertEqual(openstack_utils.
1045 create_neutron_router(self.neutron_client,
1049 @mock.patch('functest.utils.openstack_utils.logger.error')
1050 def test_create_neutron_router_exception(self, mock_logger_error):
1051 self.assertEqual(openstack_utils.
1052 create_neutron_router(Exception,
1055 self.assertTrue(mock_logger_error.called)
1057 def test_create_neutron_port_default(self):
1058 self.assertEqual(openstack_utils.
1059 create_neutron_port(self.neutron_client,
1065 @mock.patch('functest.utils.openstack_utils.logger.error')
1066 def test_create_neutron_port_exception(self, mock_logger_error):
1067 self.assertEqual(openstack_utils.
1068 create_neutron_port(Exception,
1073 self.assertTrue(mock_logger_error.called)
1075 def test_update_neutron_net_default(self):
1076 self.assertTrue(openstack_utils.
1077 update_neutron_net(self.neutron_client,
1080 @mock.patch('functest.utils.openstack_utils.logger.error')
1081 def test_update_neutron_net_exception(self, mock_logger_error):
1082 self.assertFalse(openstack_utils.
1083 update_neutron_net(Exception,
1085 self.assertTrue(mock_logger_error.called)
1087 def test_update_neutron_port_default(self):
1088 self.assertEqual(openstack_utils.
1089 update_neutron_port(self.neutron_client,
1094 @mock.patch('functest.utils.openstack_utils.logger.error')
1095 def test_update_neutron_port_exception(self, mock_logger_error):
1096 self.assertEqual(openstack_utils.
1097 update_neutron_port(Exception,
1101 self.assertTrue(mock_logger_error.called)
1103 def test_add_interface_router_default(self):
1104 self.assertTrue(openstack_utils.
1105 add_interface_router(self.neutron_client,
1109 @mock.patch('functest.utils.openstack_utils.logger.error')
1110 def test_add_interface_router_exception(self, mock_logger_error):
1111 self.assertFalse(openstack_utils.
1112 add_interface_router(Exception,
1115 self.assertTrue(mock_logger_error.called)
1117 def test_add_gateway_router_default(self):
1118 with mock.patch('functest.utils.openstack_utils.'
1119 'get_external_net_id',
1120 return_value='network_id'):
1121 self.assertTrue(openstack_utils.
1122 add_gateway_router(self.neutron_client,
1125 @mock.patch('functest.utils.openstack_utils.logger.error')
1126 def test_add_gateway_router_exception(self, mock_logger_error):
1127 with mock.patch('functest.utils.openstack_utils.'
1128 'get_external_net_id',
1129 return_value='network_id'):
1130 self.assertFalse(openstack_utils.
1131 add_gateway_router(Exception,
1133 self.assertTrue(mock_logger_error.called)
1135 def test_delete_neutron_net_default(self):
1136 self.assertTrue(openstack_utils.
1137 delete_neutron_net(self.neutron_client,
1140 @mock.patch('functest.utils.openstack_utils.logger.error')
1141 def test_delete_neutron_net_exception(self, mock_logger_error):
1142 self.assertFalse(openstack_utils.
1143 delete_neutron_net(Exception,
1145 self.assertTrue(mock_logger_error.called)
1147 def test_delete_neutron_subnet_default(self):
1148 self.assertTrue(openstack_utils.
1149 delete_neutron_subnet(self.neutron_client,
1152 @mock.patch('functest.utils.openstack_utils.logger.error')
1153 def test_delete_neutron_subnet_exception(self, mock_logger_error):
1154 self.assertFalse(openstack_utils.
1155 delete_neutron_subnet(Exception,
1157 self.assertTrue(mock_logger_error.called)
1159 def test_delete_neutron_router_default(self):
1160 self.assertTrue(openstack_utils.
1161 delete_neutron_router(self.neutron_client,
1164 @mock.patch('functest.utils.openstack_utils.logger.error')
1165 def test_delete_neutron_router_exception(self, mock_logger_error):
1166 self.assertFalse(openstack_utils.
1167 delete_neutron_router(Exception,
1169 self.assertTrue(mock_logger_error.called)
1171 def test_delete_neutron_port_default(self):
1172 self.assertTrue(openstack_utils.
1173 delete_neutron_port(self.neutron_client,
1176 @mock.patch('functest.utils.openstack_utils.logger.error')
1177 def test_delete_neutron_port_exception(self, mock_logger_error):
1178 self.assertFalse(openstack_utils.
1179 delete_neutron_port(Exception,
1181 self.assertTrue(mock_logger_error.called)
1183 def test_remove_interface_router_default(self):
1184 self.assertTrue(openstack_utils.
1185 remove_interface_router(self.neutron_client,
1189 @mock.patch('functest.utils.openstack_utils.logger.error')
1190 def test_remove_interface_router_exception(self, mock_logger_error):
1191 self.assertFalse(openstack_utils.
1192 remove_interface_router(Exception,
1195 self.assertTrue(mock_logger_error.called)
1197 def test_remove_gateway_router_default(self):
1198 self.assertTrue(openstack_utils.
1199 remove_gateway_router(self.neutron_client,
1202 @mock.patch('functest.utils.openstack_utils.logger.error')
1203 def test_remove_gateway_router_exception(self, mock_logger_error):
1204 self.assertFalse(openstack_utils.
1205 remove_gateway_router(Exception,
1207 self.assertTrue(mock_logger_error.called)
1209 def test_get_security_groups_default(self):
1210 self.assertEqual(openstack_utils.
1211 get_security_groups(self.neutron_client),
1214 @mock.patch('functest.utils.openstack_utils.logger.error')
1215 def test_get_security_groups_exception(self, mock_logger_error):
1216 self.assertEqual(openstack_utils.
1217 get_security_groups(Exception),
1219 self.assertTrue(mock_logger_error.called)
1221 def test_get_security_group_id_default(self):
1222 with mock.patch('functest.utils.openstack_utils.'
1223 'get_security_groups',
1224 return_value=[self.sec_group]):
1225 self.assertEqual(openstack_utils.
1226 get_security_group_id(self.neutron_client,
1230 def test_get_security_group_rules_default(self):
1231 self.assertEqual(openstack_utils.
1232 get_security_group_rules(self.neutron_client,
1233 self.sec_group['id']),
1234 [self.sec_group_rule])
1236 @mock.patch('functest.utils.openstack_utils.logger.error')
1237 def test_get_security_group_rules_exception(self, mock_logger_error):
1238 self.assertEqual(openstack_utils.
1239 get_security_group_rules(Exception,
1242 self.assertTrue(mock_logger_error.called)
1244 def test_check_security_group_rules_not_exists(self):
1245 self.assertEqual(openstack_utils.
1246 check_security_group_rules(self.neutron_client,
1254 def test_check_security_group_rules_exists(self):
1255 self.assertEqual(openstack_utils.
1256 check_security_group_rules(self.neutron_client,
1257 self.sec_group['id'],
1264 @mock.patch('functest.utils.openstack_utils.logger.error')
1265 def test_check_security_group_rules_exception(self, mock_logger_error):
1266 self.assertEqual(openstack_utils.
1267 check_security_group_rules(Exception,
1274 self.assertTrue(mock_logger_error.called)
1276 def test_create_security_group_default(self):
1277 self.assertEqual(openstack_utils.
1278 create_security_group(self.neutron_client,
1283 @mock.patch('functest.utils.openstack_utils.logger.error')
1284 def test_create_security_group_exception(self, mock_logger_error):
1285 self.assertEqual(openstack_utils.
1286 create_security_group(Exception,
1290 self.assertTrue(mock_logger_error.called)
1292 def test_create_secgroup_rule_default(self):
1293 self.assertTrue(openstack_utils.
1294 create_secgroup_rule(self.neutron_client,
1300 self.assertTrue(openstack_utils.
1301 create_secgroup_rule(self.neutron_client,
1306 @mock.patch('functest.utils.openstack_utils.logger.error')
1307 def test_create_secgroup_rule_invalid_port_range(self, mock_logger_error):
1308 self.assertFalse(openstack_utils.
1309 create_secgroup_rule(self.neutron_client,
1315 @mock.patch('functest.utils.openstack_utils.logger.error')
1316 def test_create_secgroup_rule_exception(self, mock_logger_error):
1317 self.assertFalse(openstack_utils.
1318 create_secgroup_rule(Exception,
1323 @mock.patch('functest.utils.openstack_utils.logger.info')
1324 def test_create_security_group_full_default(self, mock_logger_info):
1325 with mock.patch('functest.utils.openstack_utils.'
1326 'get_security_group_id',
1327 return_value='sg_id'):
1328 self.assertEqual(openstack_utils.
1329 create_security_group_full(self.neutron_client,
1333 self.assertTrue(mock_logger_info)
1335 @mock.patch('functest.utils.openstack_utils.logger.info')
1336 @mock.patch('functest.utils.openstack_utils.logger.error')
1337 def test_create_security_group_full_sec_group_fail(self,
1340 with mock.patch('functest.utils.openstack_utils.'
1341 'get_security_group_id',
1343 mock.patch('functest.utils.openstack_utils.'
1344 'create_security_group',
1345 return_value=False):
1346 self.assertEqual(openstack_utils.
1347 create_security_group_full(self.neutron_client,
1351 self.assertTrue(mock_logger_error)
1352 self.assertTrue(mock_logger_info)
1354 @mock.patch('functest.utils.openstack_utils.logger.debug')
1355 @mock.patch('functest.utils.openstack_utils.logger.info')
1356 @mock.patch('functest.utils.openstack_utils.logger.error')
1357 def test_create_security_group_full_secgroup_rule_fail(self,
1361 with mock.patch('functest.utils.openstack_utils.'
1362 'get_security_group_id',
1364 mock.patch('functest.utils.openstack_utils.'
1365 'create_security_group',
1366 return_value={'id': 'sg_id',
1367 'name': 'sg_name'}), \
1368 mock.patch('functest.utils.openstack_utils.'
1369 'create_secgroup_rule',
1370 return_value=False):
1371 self.assertEqual(openstack_utils.
1372 create_security_group_full(self.neutron_client,
1376 self.assertTrue(mock_logger_error)
1377 self.assertTrue(mock_logger_info)
1378 self.assertTrue(mock_logger_debug)
1380 def test_add_secgroup_to_instance_default(self):
1381 self.assertTrue(openstack_utils.
1382 add_secgroup_to_instance(self.nova_client,
1386 @mock.patch('functest.utils.openstack_utils.logger.error')
1387 def test_add_secgroup_to_instance_exception(self, mock_logger_error):
1388 self.assertFalse(openstack_utils.
1389 add_secgroup_to_instance(Exception,
1392 self.assertTrue(mock_logger_error.called)
1394 def test_update_sg_quota_default(self):
1395 self.assertTrue(openstack_utils.
1396 update_sg_quota(self.neutron_client,
1401 @mock.patch('functest.utils.openstack_utils.logger.error')
1402 def test_update_sg_quota_exception(self, mock_logger_error):
1403 self.assertFalse(openstack_utils.
1404 update_sg_quota(Exception,
1408 self.assertTrue(mock_logger_error.called)
1410 def test_delete_security_group_default(self):
1411 self.assertTrue(openstack_utils.
1412 delete_security_group(self.neutron_client,
1415 @mock.patch('functest.utils.openstack_utils.logger.error')
1416 def test_delete_security_group_exception(self, mock_logger_error):
1417 self.assertFalse(openstack_utils.
1418 delete_security_group(Exception,
1420 self.assertTrue(mock_logger_error.called)
1422 def test_get_images_default(self):
1423 self.assertEqual(openstack_utils.
1424 get_images(self.glance_client),
1427 @mock.patch('functest.utils.openstack_utils.logger.error')
1428 def test_get_images_exception(self, mock_logger_error):
1429 self.assertEqual(openstack_utils.
1430 get_images(Exception),
1432 self.assertTrue(mock_logger_error.called)
1434 def test_get_image_id_default(self):
1435 self.assertEqual(openstack_utils.
1436 get_image_id(self.glance_client,
1440 # create_glance_image, get_or_create_image
1441 @mock.patch('functest.utils.openstack_utils.logger.error')
1442 def test_create_glance_image_file_present(self, mock_logger_error):
1443 with mock.patch('functest.utils.openstack_utils.'
1445 return_value=False):
1446 self.assertEqual(openstack_utils.
1447 create_glance_image(self.glance_client,
1451 self.assertTrue(mock_logger_error.called)
1453 @mock.patch('functest.utils.openstack_utils.logger.info')
1454 def test_create_glance_image_already_exist(self, mock_logger_info):
1455 with mock.patch('functest.utils.openstack_utils.'
1457 return_value=True), \
1458 mock.patch('functest.utils.openstack_utils.get_image_id',
1459 return_value='image_id'):
1460 self.assertEqual(openstack_utils.
1461 create_glance_image(self.glance_client,
1465 self.assertTrue(mock_logger_info.called)
1467 @mock.patch('functest.utils.openstack_utils.logger.info')
1468 def test_create_glance_image_default(self, mock_logger_info):
1469 with mock.patch('functest.utils.openstack_utils.'
1471 return_value=True), \
1472 mock.patch('functest.utils.openstack_utils.get_image_id',
1474 mock.patch('__builtin__.open',
1475 mock.mock_open(read_data='1')) as m:
1476 self.assertEqual(openstack_utils.
1477 create_glance_image(self.glance_client,
1481 m.assert_called_once_with('file_path')
1482 self.assertTrue(mock_logger_info.called)
1484 @mock.patch('functest.utils.openstack_utils.logger.error')
1485 def test_create_glance_image_exception(self, mock_logger_error):
1486 with mock.patch('functest.utils.openstack_utils.'
1488 return_value=True), \
1489 mock.patch('functest.utils.openstack_utils.get_image_id',
1490 side_effect=Exception):
1491 self.assertEqual(openstack_utils.
1492 create_glance_image(self.glance_client,
1496 self.assertTrue(mock_logger_error.called)
1498 def test_delete_glance_image_default(self):
1499 self.assertTrue(openstack_utils.
1500 delete_glance_image(self.nova_client,
1503 @mock.patch('functest.utils.openstack_utils.logger.error')
1504 def test_delete_glance_image_exception(self, mock_logger_error):
1505 self.assertFalse(openstack_utils.
1506 delete_glance_image(Exception,
1508 self.assertTrue(mock_logger_error.called)
1510 def test_get_volumes_default(self):
1511 self.assertEqual(openstack_utils.
1512 get_volumes(self.cinder_client),
1515 @mock.patch('functest.utils.openstack_utils.logger.error')
1516 def test_get_volumes_exception(self, mock_logger_error):
1517 self.assertEqual(openstack_utils.
1518 get_volumes(Exception),
1520 self.assertTrue(mock_logger_error.called)
1522 def test_list_volume_types_default_private(self):
1523 self.assertEqual(openstack_utils.
1524 list_volume_types(self.cinder_client,
1527 [self.volume_types[1]])
1529 def test_list_volume_types_default_public(self):
1530 self.assertEqual(openstack_utils.
1531 list_volume_types(self.cinder_client,
1534 [self.volume_types[0]])
1536 @mock.patch('functest.utils.openstack_utils.logger.error')
1537 def test_list_volume_types_exception(self, mock_logger_error):
1538 self.assertEqual(openstack_utils.
1539 list_volume_types(Exception),
1541 self.assertTrue(mock_logger_error.called)
1543 def test_create_volume_type_default(self):
1544 self.assertEqual(openstack_utils.
1545 create_volume_type(self.cinder_client,
1546 'test_volume_type'),
1547 self.volume_types[0])
1549 @mock.patch('functest.utils.openstack_utils.logger.error')
1550 def test_create_volume_type_exception(self, mock_logger_error):
1551 self.assertEqual(openstack_utils.
1552 create_volume_type(Exception,
1553 'test_volume_type'),
1555 self.assertTrue(mock_logger_error.called)
1557 def test_update_cinder_quota_default(self):
1558 self.assertTrue(openstack_utils.
1559 update_cinder_quota(self.cinder_client,
1565 @mock.patch('functest.utils.openstack_utils.logger.error')
1566 def test_update_cinder_quota_exception(self, mock_logger_error):
1567 self.assertFalse(openstack_utils.
1568 update_cinder_quota(Exception,
1573 self.assertTrue(mock_logger_error.called)
1575 def test_delete_volume_default(self):
1576 self.assertTrue(openstack_utils.
1577 delete_volume(self.cinder_client,
1581 self.assertTrue(openstack_utils.
1582 delete_volume(self.cinder_client,
1586 @mock.patch('functest.utils.openstack_utils.logger.error')
1587 def test_delete_volume_exception(self, mock_logger_error):
1588 self.assertFalse(openstack_utils.
1589 delete_volume(Exception,
1592 self.assertTrue(mock_logger_error.called)
1594 def test_delete_volume_type_default(self):
1595 self.assertTrue(openstack_utils.
1596 delete_volume_type(self.cinder_client,
1597 self.volume_types[0]))
1599 @mock.patch('functest.utils.openstack_utils.logger.error')
1600 def test_delete_volume_type_exception(self, mock_logger_error):
1601 self.assertFalse(openstack_utils.
1602 delete_volume_type(Exception,
1603 self.volume_types[0]))
1604 self.assertTrue(mock_logger_error.called)
1606 def test_get_tenants_default(self):
1607 with mock.patch('functest.utils.openstack_utils.'
1608 'is_keystone_v3', return_value=True):
1609 self.assertEqual(openstack_utils.
1610 get_tenants(self.keystone_client),
1612 with mock.patch('functest.utils.openstack_utils.'
1613 'is_keystone_v3', return_value=False):
1614 self.assertEqual(openstack_utils.
1615 get_tenants(self.keystone_client),
1618 @mock.patch('functest.utils.openstack_utils.logger.error')
1619 def test_get_tenants_exception(self, mock_logger_error):
1620 self.assertEqual(openstack_utils.
1621 get_tenants(Exception),
1623 self.assertTrue(mock_logger_error.called)
1625 def test_get_users_default(self):
1626 self.assertEqual(openstack_utils.
1627 get_users(self.keystone_client),
1630 @mock.patch('functest.utils.openstack_utils.logger.error')
1631 def test_get_users_exception(self, mock_logger_error):
1632 self.assertEqual(openstack_utils.
1633 get_users(Exception),
1635 self.assertTrue(mock_logger_error.called)
1637 def test_get_tenant_id_default(self):
1638 self.assertEqual(openstack_utils.
1639 get_tenant_id(self.keystone_client,
1643 def test_get_user_id_default(self):
1644 self.assertEqual(openstack_utils.
1645 get_user_id(self.keystone_client,
1649 def test_get_role_id_default(self):
1650 self.assertEqual(openstack_utils.
1651 get_role_id(self.keystone_client,
1655 def test_create_tenant_default(self):
1656 with mock.patch('functest.utils.openstack_utils.'
1657 'is_keystone_v3', return_value=True):
1658 self.assertEqual(openstack_utils.
1659 create_tenant(self.keystone_client,
1663 with mock.patch('functest.utils.openstack_utils.'
1664 'is_keystone_v3', return_value=False):
1665 self.assertEqual(openstack_utils.
1666 create_tenant(self.keystone_client,
1671 @mock.patch('functest.utils.openstack_utils.logger.error')
1672 def test_create_tenant_exception(self, mock_logger_error):
1673 self.assertEqual(openstack_utils.
1674 create_tenant(Exception,
1678 self.assertTrue(mock_logger_error.called)
1680 def test_create_user_default(self):
1681 with mock.patch('functest.utils.openstack_utils.'
1682 'is_keystone_v3', return_value=True):
1683 self.assertEqual(openstack_utils.
1684 create_user(self.keystone_client,
1690 with mock.patch('functest.utils.openstack_utils.'
1691 'is_keystone_v3', return_value=False):
1692 self.assertEqual(openstack_utils.
1693 create_user(self.keystone_client,
1700 @mock.patch('functest.utils.openstack_utils.logger.error')
1701 def test_create_user_exception(self, mock_logger_error):
1702 self.assertEqual(openstack_utils.
1703 create_user(Exception,
1709 self.assertTrue(mock_logger_error.called)
1711 def test_add_role_user_default(self):
1712 with mock.patch('functest.utils.openstack_utils.'
1713 'is_keystone_v3', return_value=True):
1714 self.assertTrue(openstack_utils.
1715 add_role_user(self.keystone_client,
1720 with mock.patch('functest.utils.openstack_utils.'
1721 'is_keystone_v3', return_value=False):
1722 self.assertTrue(openstack_utils.
1723 add_role_user(self.keystone_client,
1728 @mock.patch('functest.utils.openstack_utils.logger.error')
1729 def test_add_role_user_exception(self, mock_logger_error):
1730 self.assertFalse(openstack_utils.
1731 add_role_user(Exception,
1735 self.assertTrue(mock_logger_error.called)
1737 def test_delete_tenant_default(self):
1738 with mock.patch('functest.utils.openstack_utils.'
1739 'is_keystone_v3', return_value=True):
1740 self.assertTrue(openstack_utils.
1741 delete_tenant(self.keystone_client,
1744 with mock.patch('functest.utils.openstack_utils.'
1745 'is_keystone_v3', return_value=False):
1746 self.assertTrue(openstack_utils.
1747 delete_tenant(self.keystone_client,
1750 @mock.patch('functest.utils.openstack_utils.logger.error')
1751 def test_delete_tenant_exception(self, mock_logger_error):
1752 self.assertFalse(openstack_utils.
1753 delete_tenant(Exception,
1755 self.assertTrue(mock_logger_error.called)
1757 def test_delete_user_default(self):
1758 self.assertTrue(openstack_utils.
1759 delete_user(self.keystone_client,
1762 @mock.patch('functest.utils.openstack_utils.logger.error')
1763 def test_delete_user_exception(self, mock_logger_error):
1764 self.assertFalse(openstack_utils.
1765 delete_user(Exception,
1767 self.assertTrue(mock_logger_error.called)
1769 def test_get_resource_default(self):
1770 with mock.patch('functest.utils.openstack_utils.'
1771 'is_keystone_v3', return_value=True):
1772 self.assertEqual(openstack_utils.
1773 get_resource(self.heat_client,
1778 @mock.patch('functest.utils.openstack_utils.logger.error')
1779 def test_get_resource_exception(self, mock_logger_error):
1780 self.assertEqual(openstack_utils.
1781 get_resource(Exception,
1785 self.assertTrue(mock_logger_error.called)
1787 def test_get_or_create_user_for_vnf_get(self):
1788 with mock.patch('functest.utils.openstack_utils.'
1790 return_value='user_id'), \
1791 mock.patch('functest.utils.openstack_utils.get_tenant_id',
1792 return_value='tenant_id'):
1793 self.assertFalse(openstack_utils.
1794 get_or_create_user_for_vnf(self.keystone_client,
1797 def test_get_or_create_user_for_vnf_create(self):
1798 with mock.patch('functest.utils.openstack_utils.'
1800 return_value=None), \
1801 mock.patch('functest.utils.openstack_utils.get_tenant_id',
1802 return_value='tenant_id'):
1803 self.assertTrue(openstack_utils.
1804 get_or_create_user_for_vnf(self.keystone_client,
1807 def test_get_or_create_user_for_vnf_error_get_user_id(self):
1808 with mock.patch('functest.utils.openstack_utils.'
1810 side_effect=Exception):
1811 self.assertRaises(Exception)
1813 def test_get_or_create_user_for_vnf_error_get_tenant_id(self):
1814 with mock.patch('functest.utils.openstack_utils.'
1816 return_value='user_id'), \
1817 mock.patch('functest.utils.openstack_utils.get_tenant_id',
1818 side_effect='Exception'):
1819 self.assertRaises(Exception)
1821 def test_get_or_create_tenant_for_vnf_get(self):
1822 with mock.patch('functest.utils.openstack_utils.'
1824 return_value='tenant_id'):
1826 openstack_utils.get_or_create_tenant_for_vnf(
1827 self.keystone_client, 'tenant_name', 'tenant_description'))
1829 def test_get_or_create_tenant_for_vnf_create(self):
1830 with mock.patch('functest.utils.openstack_utils.get_tenant_id',
1833 openstack_utils.get_or_create_tenant_for_vnf(
1834 self.keystone_client, 'tenant_name', 'tenant_description'))
1836 def test_get_or_create_tenant_for_vnf_error_get_tenant_id(self):
1837 with mock.patch('functest.utils.openstack_utils.'
1839 side_effect=Exception):
1840 self.assertRaises(Exception)
1842 def test_download_and_add_image_on_glance_image_creation_failure(self):
1843 with mock.patch('functest.utils.openstack_utils.'
1845 mock.patch('functest.utils.openstack_utils.'
1846 'ft_utils.download_url',
1847 return_value=True), \
1848 mock.patch('functest.utils.openstack_utils.'
1849 'create_glance_image',
1851 resp = openstack_utils.download_and_add_image_on_glance(
1856 self.assertEqual(resp, False)
1859 if __name__ == "__main__":
1860 logging.disable(logging.CRITICAL)
1861 unittest.main(verbosity=2)