3 # All rights reserved. This program and the accompanying materials
4 # are made available under the terms of the Apache License, Version 2.0
5 # which accompanies this distribution, and is available at
6 # http://www.apache.org/licenses/LICENSE-2.0
15 from functest.utils import openstack_utils
18 class OSUtilsTesting(unittest.TestCase):
20 logging.disable(logging.CRITICAL)
22 def _get_env_cred_dict(self, os_prefix=''):
23 return {'OS_USERNAME': os_prefix + 'username',
24 'OS_PASSWORD': os_prefix + 'password',
25 'OS_AUTH_URL': os_prefix + 'auth_url',
26 'OS_TENANT_NAME': os_prefix + 'tenant_name',
27 'OS_USER_DOMAIN_NAME': os_prefix + 'user_domain_name',
28 'OS_PROJECT_DOMAIN_NAME': os_prefix + 'project_domain_name',
29 'OS_PROJECT_NAME': os_prefix + 'project_name',
30 'OS_ENDPOINT_TYPE': os_prefix + 'endpoint_type',
31 'OS_REGION_NAME': os_prefix + 'region_name',
32 'OS_CACERT': os_prefix + 'https_cacert'}
34 def _get_os_env_vars(self):
35 return {'username': 'test_username', 'password': 'test_password',
36 'auth_url': 'test_auth_url', 'tenant_name': 'test_tenant_name',
37 'user_domain_name': 'test_user_domain_name',
38 'project_domain_name': 'test_project_domain_name',
39 'project_name': 'test_project_name',
40 'endpoint_type': 'test_endpoint_type',
41 'region_name': 'test_region_name',
42 'https_cacert': 'test_https_cacert'}
45 self.env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD']
46 self.tenant_name = 'test_tenant_name'
47 self.env_cred_dict = self._get_env_cred_dict()
48 self.os_environs = self._get_env_cred_dict(os_prefix='test_')
49 self.os_env_vars = self._get_os_env_vars()
51 mock_obj = mock.Mock()
52 attrs = {'name': 'test_flavor',
55 mock_obj.configure_mock(**attrs)
56 self.flavor = mock_obj
58 mock_obj = mock.Mock()
59 attrs = {'name': 'test_aggregate',
61 'hosts': ['host_name']}
62 mock_obj.configure_mock(**attrs)
63 self.aggregate = mock_obj
65 mock_obj = mock.Mock()
66 attrs = {'id': 'instance_id',
67 'name': 'test_instance',
69 mock_obj.configure_mock(**attrs)
70 self.instance = mock_obj
72 mock_obj = mock.Mock()
73 attrs = {'id': 'azone_id',
74 'zoneName': 'test_azone',
76 mock_obj.configure_mock(**attrs)
77 self.availability_zone = mock_obj
79 mock_obj = mock.Mock()
80 attrs = {'id': 'floating_id',
81 'zoneName': 'test_floating_ip',
83 mock_obj.configure_mock(**attrs)
84 self.floating_ip = mock_obj
86 mock_obj = mock.Mock()
87 attrs = {'id': 'hypervisor_id',
88 'hypervisor_hostname': 'test_hostname',
90 mock_obj.configure_mock(**attrs)
91 self.hypervisor = mock_obj
93 mock_obj = mock.Mock()
94 attrs = {'id': 'image_id',
96 mock_obj.configure_mock(**attrs)
99 mock_obj = mock.Mock()
100 self.mock_return = mock_obj
102 self.nova_client = mock.Mock()
103 attrs = {'servers.list.return_value': [self.instance],
104 'servers.get.return_value': self.instance,
105 'servers.find.return_value': self.instance,
106 'servers.create.return_value': self.instance,
107 'flavors.list.return_value': [self.flavor],
108 'flavors.find.return_value': self.flavor,
109 'servers.add_floating_ip.return_value': mock.Mock(),
110 'servers.force_delete.return_value': mock.Mock(),
111 'aggregates.list.return_value': [self.aggregate],
112 'aggregates.add_host.return_value': mock.Mock(),
113 'aggregates.remove_host.return_value': mock.Mock(),
114 'aggregates.get.return_value': self.aggregate,
115 'aggregates.delete.return_value': mock.Mock(),
116 'availability_zones.list.return_value':
117 [self.availability_zone],
118 'floating_ips.list.return_value': [self.floating_ip],
119 'floating_ips.delete.return_value': mock.Mock(),
120 'hypervisors.list.return_value': [self.hypervisor],
121 'create.return_value': mock.Mock(),
122 'add_security_group.return_value': mock.Mock(),
123 'images.list.return_value': [self.image],
124 'images.delete.return_value': mock.Mock(),
126 self.nova_client.configure_mock(**attrs)
128 self.glance_client = mock.Mock()
129 attrs = {'images.list.return_value': [self.image],
130 'images.create.return_value': self.image,
131 'images.upload.return_value': mock.Mock()}
132 self.glance_client.configure_mock(**attrs)
134 mock_obj = mock.Mock()
135 attrs = {'id': 'volume_id',
136 'name': 'test_volume'}
137 mock_obj.configure_mock(**attrs)
138 self.volume = mock_obj
140 mock_obj = mock.Mock()
141 attrs = {'id': 'volume_type_id',
142 'name': 'test_volume_type',
144 mock_obj.configure_mock(**attrs)
145 self.volume_types = [mock_obj]
147 mock_obj = mock.Mock()
148 attrs = {'id': 'volume_type_id',
149 'name': 'test_volume_type',
151 mock_obj.configure_mock(**attrs)
152 self.volume_types.append(mock_obj)
154 self.cinder_client = mock.Mock()
155 attrs = {'volumes.list.return_value': [self.volume],
156 'volume_types.list.return_value': self.volume_types,
157 'volume_types.create.return_value': self.volume_types[0],
158 'volume_types.delete.return_value': mock.Mock(),
159 'quotas.update.return_value': mock.Mock(),
160 'volumes.detach.return_value': mock.Mock(),
161 'volumes.force_delete.return_value': mock.Mock(),
162 'volumes.delete.return_value': mock.Mock()
164 self.cinder_client.configure_mock(**attrs)
166 self.resource = mock.Mock()
167 attrs = {'id': 'resource_test_id',
168 'name': 'resource_test_name'
171 self.heat_client = mock.Mock()
172 attrs = {'resources.get.return_value': self.resource}
173 self.heat_client.configure_mock(**attrs)
175 mock_obj = mock.Mock()
176 attrs = {'id': 'tenant_id',
177 'name': 'test_tenant'}
178 mock_obj.configure_mock(**attrs)
179 self.tenant = mock_obj
181 mock_obj = mock.Mock()
182 attrs = {'id': 'user_id',
184 mock_obj.configure_mock(**attrs)
187 mock_obj = mock.Mock()
188 attrs = {'id': 'role_id',
190 mock_obj.configure_mock(**attrs)
193 self.keystone_client = mock.Mock()
194 attrs = {'projects.list.return_value': [self.tenant],
195 'tenants.list.return_value': [self.tenant],
196 'users.list.return_value': [self.user],
197 'roles.list.return_value': [self.role],
198 'projects.create.return_value': self.tenant,
199 'tenants.create.return_value': self.tenant,
200 'users.create.return_value': self.user,
201 'roles.grant.return_value': mock.Mock(),
202 'roles.add_user_role.return_value': mock.Mock(),
203 'projects.delete.return_value': mock.Mock(),
204 'tenants.delete.return_value': mock.Mock(),
205 'users.delete.return_value': mock.Mock(),
207 self.keystone_client.configure_mock(**attrs)
209 self.router = {'id': 'router_id',
210 'name': 'test_router'}
212 self.subnet = {'id': 'subnet_id',
213 'name': 'test_subnet'}
215 self.networks = [{'id': 'network_id',
216 'name': 'test_network',
217 'router:external': False,
219 'subnets': [self.subnet]},
220 {'id': 'network_id1',
221 'name': 'test_network1',
222 'router:external': True,
224 'subnets': [self.subnet]}]
226 self.port = {'id': 'port_id',
229 self.sec_group = {'id': 'sec_group_id',
230 'name': 'test_sec_group'}
232 self.neutron_floatingip = {'id': 'fip_id',
233 'floating_ip_address': 'test_ip'}
234 self.neutron_client = mock.Mock()
235 attrs = {'list_networks.return_value': {'networks': self.networks},
236 'list_routers.return_value': {'routers': [self.router]},
237 'list_ports.return_value': {'ports': [self.port]},
238 'list_subnets.return_value': {'subnets': [self.subnet]},
239 'create_network.return_value': {'network': self.networks[0]},
240 'create_subnet.return_value': {'subnets': [self.subnet]},
241 'create_router.return_value': {'router': self.router},
242 'create_port.return_value': {'port': self.port},
243 'create_floatingip.return_value': {'floatingip':
244 self.neutron_floatingip},
245 'update_network.return_value': mock.Mock(),
246 'update_port.return_value': {'port': self.port},
247 'add_interface_router.return_value': mock.Mock(),
248 'add_gateway_router.return_value': mock.Mock(),
249 'delete_network.return_value': mock.Mock(),
250 'delete_subnet.return_value': mock.Mock(),
251 'delete_router.return_value': mock.Mock(),
252 'delete_port.return_value': mock.Mock(),
253 'remove_interface_router.return_value': mock.Mock(),
254 'remove_gateway_router.return_value': mock.Mock(),
255 'create_bgpvpn.return_value': self.mock_return,
256 'create_network_association.return_value': self.mock_return,
257 'create_router_association.return_value': self.mock_return,
258 'update_bgpvpn.return_value': self.mock_return,
259 'delete_bgpvpn.return_value': self.mock_return,
260 'show_bgpvpn.return_value': self.mock_return,
261 'list_security_groups.return_value': {'security_groups':
263 'create_security_group_rule.return_value': mock.Mock(),
264 'create_security_group.return_value': {'security_group':
266 'update_quota.return_value': mock.Mock(),
267 'delete_security_group.return_value': mock.Mock()
269 self.neutron_client.configure_mock(**attrs)
271 self.empty_client = mock.Mock()
272 attrs = {'list_networks.return_value': {'networks': []},
273 'list_routers.return_value': {'routers': []},
274 'list_ports.return_value': {'ports': []},
275 'list_subnets.return_value': {'subnets': []}}
276 self.empty_client.configure_mock(**attrs)
278 @mock.patch('functest.utils.openstack_utils.os.getenv',
280 def test_is_keystone_v3_missing_identity(self, mock_os_getenv):
281 self.assertEqual(openstack_utils.is_keystone_v3(), False)
283 @mock.patch('functest.utils.openstack_utils.os.getenv',
285 def test_is_keystone_v3_default(self, mock_os_getenv):
286 self.assertEqual(openstack_utils.is_keystone_v3(), True)
288 @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
290 def test_get_rc_env_vars_missing_identity(self, mock_get_rc_env):
291 exp_resp = self.env_vars
292 exp_resp.extend(['OS_TENANT_NAME'])
293 self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
295 @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
297 def test_get_rc_env_vars_default(self, mock_get_rc_env):
298 exp_resp = self.env_vars
299 exp_resp.extend(['OS_PROJECT_NAME',
300 'OS_USER_DOMAIN_NAME',
301 'OS_PROJECT_DOMAIN_NAME'])
302 self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
304 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
305 def test_check_credentials_missing_env(self, mock_get_rc_env):
306 exp_resp = self.env_vars
307 exp_resp.extend(['OS_TENANT_NAME'])
308 mock_get_rc_env.return_value = exp_resp
309 with mock.patch.dict('functest.utils.openstack_utils.os.environ', {},
311 self.assertEqual(openstack_utils.check_credentials(), False)
313 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
314 def test_check_credentials_default(self, mock_get_rc_env):
315 exp_resp = ['OS_TENANT_NAME']
316 mock_get_rc_env.return_value = exp_resp
317 with mock.patch.dict('functest.utils.openstack_utils.os.environ',
318 {'OS_TENANT_NAME': self.tenant_name},
320 self.assertEqual(openstack_utils.check_credentials(), True)
322 def test_get_env_cred_dict(self):
323 self.assertDictEqual(openstack_utils.get_env_cred_dict(),
326 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
327 def test_get_credentials_default(self, mock_get_rc_env):
328 mock_get_rc_env.return_value = self.env_cred_dict.keys()
329 with mock.patch.dict('functest.utils.openstack_utils.os.environ',
332 self.assertDictEqual(openstack_utils.get_credentials(),
335 def _get_credentials_missing_env(self, var):
336 dic = copy.deepcopy(self.os_environs)
338 with mock.patch('functest.utils.openstack_utils.get_rc_env_vars',
339 return_value=self.env_cred_dict.keys()), \
340 mock.patch.dict('functest.utils.openstack_utils.os.environ',
343 self.assertRaises(openstack_utils.MissingEnvVar,
344 lambda: openstack_utils.get_credentials())
346 def test_get_credentials_missing_username(self):
347 self._get_credentials_missing_env('OS_USERNAME')
349 def test_get_credentials_missing_password(self):
350 self._get_credentials_missing_env('OS_PASSWORD')
352 def test_get_credentials_missing_auth_url(self):
353 self._get_credentials_missing_env('OS_AUTH_URL')
355 def test_get_credentials_missing_tenantname(self):
356 self._get_credentials_missing_env('OS_TENANT_NAME')
358 def test_get_credentials_missing_domainname(self):
359 self._get_credentials_missing_env('OS_USER_DOMAIN_NAME')
361 def test_get_credentials_missing_projectname(self):
362 self._get_credentials_missing_env('OS_PROJECT_NAME')
364 def test_get_credentials_missing_endpoint_type(self):
365 self._get_credentials_missing_env('OS_ENDPOINT_TYPE')
367 def _test_source_credentials(self, msg, key='OS_TENANT_NAME',
374 with mock.patch('__builtin__.open', mock.mock_open(read_data=msg),
376 m.return_value.__iter__ = lambda self: iter(self.readline, '')
377 openstack_utils.source_credentials(f)
378 m.assert_called_once_with(f, 'r')
379 self.assertEqual(os.environ[key], value)
381 def test_source_credentials(self):
382 self._test_source_credentials('OS_TENANT_NAME=admin')
383 self._test_source_credentials('OS_TENANT_NAME= admin')
384 self._test_source_credentials('OS_TENANT_NAME = admin')
385 self._test_source_credentials('OS_TENANT_NAME = "admin"')
386 self._test_source_credentials('export OS_TENANT_NAME=admin')
387 self._test_source_credentials('export OS_TENANT_NAME =admin')
388 self._test_source_credentials('export OS_TENANT_NAME = admin')
389 self._test_source_credentials('export OS_TENANT_NAME = "admin"')
390 self._test_source_credentials('OS_TENANT_NAME', value='')
391 self._test_source_credentials('export OS_TENANT_NAME', value='')
392 # This test will fail as soon as rc_file is fixed
393 self._test_source_credentials(
394 'export "\'OS_TENANT_NAME\'" = "\'admin\'"')
396 @mock.patch('functest.utils.openstack_utils.os.getenv',
398 def test_get_keystone_client_version_missing_env(self, mock_os_getenv):
399 self.assertEqual(openstack_utils.get_keystone_client_version(),
400 openstack_utils.DEFAULT_API_VERSION)
402 @mock.patch('functest.utils.openstack_utils.logger.info')
403 @mock.patch('functest.utils.openstack_utils.os.getenv',
405 def test_get_keystone_client_version_default(self, mock_os_getenv,
407 self.assertEqual(openstack_utils.get_keystone_client_version(),
409 mock_logger_info.assert_called_once_with("OS_IDENTITY_API_VERSION is "
410 "set in env as '%s'", '3')
412 def test_get_keystone_client(self):
413 mock_keystone_obj = mock.Mock()
414 mock_session_obj = mock.Mock()
415 with mock.patch('functest.utils.openstack_utils'
416 '.get_keystone_client_version', return_value='3'), \
417 mock.patch('functest.utils.openstack_utils'
418 '.keystoneclient.Client',
419 return_value=mock_keystone_obj) \
420 as mock_key_client, \
421 mock.patch('functest.utils.openstack_utils.get_session',
422 return_value=mock_session_obj):
423 self.assertEqual(openstack_utils.get_keystone_client(),
425 mock_key_client.assert_called_once_with('3',
426 session=mock_session_obj)
428 @mock.patch('functest.utils.openstack_utils.os.getenv',
430 def test_get_nova_client_version_missing_env(self, mock_os_getenv):
431 self.assertEqual(openstack_utils.get_nova_client_version(),
432 openstack_utils.DEFAULT_API_VERSION)
434 @mock.patch('functest.utils.openstack_utils.logger.info')
435 @mock.patch('functest.utils.openstack_utils.os.getenv',
437 def test_get_nova_client_version_default(self, mock_os_getenv,
439 self.assertEqual(openstack_utils.get_nova_client_version(),
441 mock_logger_info.assert_called_once_with("OS_COMPUTE_API_VERSION is "
442 "set in env as '%s'", '3')
444 def test_get_nova_client(self):
445 mock_nova_obj = mock.Mock()
446 mock_session_obj = mock.Mock()
447 with mock.patch('functest.utils.openstack_utils'
448 '.get_nova_client_version', return_value='3'), \
449 mock.patch('functest.utils.openstack_utils'
450 '.novaclient.Client',
451 return_value=mock_nova_obj) \
452 as mock_nova_client, \
453 mock.patch('functest.utils.openstack_utils.get_session',
454 return_value=mock_session_obj):
455 self.assertEqual(openstack_utils.get_nova_client(),
457 mock_nova_client.assert_called_once_with('3',
458 session=mock_session_obj)
460 @mock.patch('functest.utils.openstack_utils.os.getenv',
462 def test_get_cinder_client_version_missing_env(self, mock_os_getenv):
463 self.assertEqual(openstack_utils.get_cinder_client_version(),
464 openstack_utils.DEFAULT_API_VERSION)
466 @mock.patch('functest.utils.openstack_utils.logger.info')
467 @mock.patch('functest.utils.openstack_utils.os.getenv',
469 def test_get_cinder_client_version_default(self, mock_os_getenv,
471 self.assertEqual(openstack_utils.get_cinder_client_version(),
473 mock_logger_info.assert_called_once_with("OS_VOLUME_API_VERSION is "
474 "set in env as '%s'", '3')
476 def test_get_cinder_client(self):
477 mock_cinder_obj = mock.Mock()
478 mock_session_obj = mock.Mock()
479 with mock.patch('functest.utils.openstack_utils'
480 '.get_cinder_client_version', return_value='3'), \
481 mock.patch('functest.utils.openstack_utils'
482 '.cinderclient.Client',
483 return_value=mock_cinder_obj) \
484 as mock_cind_client, \
485 mock.patch('functest.utils.openstack_utils.get_session',
486 return_value=mock_session_obj):
487 self.assertEqual(openstack_utils.get_cinder_client(),
489 mock_cind_client.assert_called_once_with('3',
490 session=mock_session_obj)
492 @mock.patch('functest.utils.openstack_utils.os.getenv',
494 def test_get_neutron_client_version_missing_env(self, mock_os_getenv):
495 self.assertEqual(openstack_utils.get_neutron_client_version(),
496 openstack_utils.DEFAULT_API_VERSION)
498 @mock.patch('functest.utils.openstack_utils.logger.info')
499 @mock.patch('functest.utils.openstack_utils.os.getenv',
501 def test_get_neutron_client_version_default(self, mock_os_getenv,
503 self.assertEqual(openstack_utils.get_neutron_client_version(),
505 mock_logger_info.assert_called_once_with("OS_NETWORK_API_VERSION is "
506 "set in env as '%s'", '3')
508 def test_get_neutron_client(self):
509 mock_neutron_obj = mock.Mock()
510 mock_session_obj = mock.Mock()
511 with mock.patch('functest.utils.openstack_utils'
512 '.get_neutron_client_version', return_value='3'), \
513 mock.patch('functest.utils.openstack_utils'
514 '.neutronclient.Client',
515 return_value=mock_neutron_obj) \
516 as mock_neut_client, \
517 mock.patch('functest.utils.openstack_utils.get_session',
518 return_value=mock_session_obj):
519 self.assertEqual(openstack_utils.get_neutron_client(),
521 mock_neut_client.assert_called_once_with('3',
522 session=mock_session_obj)
524 @mock.patch('functest.utils.openstack_utils.os.getenv',
526 def test_get_glance_client_version_missing_env(self, mock_os_getenv):
527 self.assertEqual(openstack_utils.get_glance_client_version(),
528 openstack_utils.DEFAULT_API_VERSION)
530 @mock.patch('functest.utils.openstack_utils.logger.info')
531 @mock.patch('functest.utils.openstack_utils.os.getenv',
533 def test_get_glance_client_version_default(self, mock_os_getenv,
535 self.assertEqual(openstack_utils.get_glance_client_version(),
537 mock_logger_info.assert_called_once_with("OS_IMAGE_API_VERSION is "
538 "set in env as '%s'", '3')
540 def test_get_glance_client(self):
541 mock_glance_obj = mock.Mock()
542 mock_session_obj = mock.Mock()
543 with mock.patch('functest.utils.openstack_utils'
544 '.get_glance_client_version', return_value='3'), \
545 mock.patch('functest.utils.openstack_utils'
546 '.glanceclient.Client',
547 return_value=mock_glance_obj) \
548 as mock_glan_client, \
549 mock.patch('functest.utils.openstack_utils.get_session',
550 return_value=mock_session_obj):
551 self.assertEqual(openstack_utils.get_glance_client(),
553 mock_glan_client.assert_called_once_with('3',
554 session=mock_session_obj)
556 @mock.patch('functest.utils.openstack_utils.os.getenv',
558 def test_get_heat_client_version_missing_env(self, mock_os_getenv):
559 self.assertEqual(openstack_utils.get_heat_client_version(),
560 openstack_utils.DEFAULT_HEAT_API_VERSION)
562 @mock.patch('functest.utils.openstack_utils.logger.info')
563 @mock.patch('functest.utils.openstack_utils.os.getenv', return_value='1')
564 def test_get_heat_client_version_default(self, mock_os_getenv,
566 self.assertEqual(openstack_utils.get_heat_client_version(), '1')
567 mock_logger_info.assert_called_once_with(
568 "OS_ORCHESTRATION_API_VERSION is set in env as '%s'", '1')
570 def test_get_heat_client(self):
571 mock_heat_obj = mock.Mock()
572 mock_session_obj = mock.Mock()
573 with mock.patch('functest.utils.openstack_utils'
574 '.get_heat_client_version', return_value='1'), \
575 mock.patch('functest.utils.openstack_utils'
576 '.heatclient.Client',
577 return_value=mock_heat_obj) \
578 as mock_heat_client, \
579 mock.patch('functest.utils.openstack_utils.get_session',
580 return_value=mock_session_obj):
581 self.assertEqual(openstack_utils.get_heat_client(),
583 mock_heat_client.assert_called_once_with('1',
584 session=mock_session_obj)
586 def test_get_instances_default(self):
587 self.assertEqual(openstack_utils.get_instances(self.nova_client),
590 @mock.patch('functest.utils.openstack_utils.logger.error')
591 def test_get_instances_exception(self, mock_logger_error):
592 self.assertEqual(openstack_utils.
593 get_instances(Exception),
595 self.assertTrue(mock_logger_error.called)
597 def test_get_instance_status_default(self):
598 self.assertEqual(openstack_utils.get_instance_status(self.nova_client,
602 @mock.patch('functest.utils.openstack_utils.logger.error')
603 def test_get_instance_status_exception(self, mock_logger_error):
604 self.assertEqual(openstack_utils.
605 get_instance_status(Exception,
608 self.assertTrue(mock_logger_error.called)
610 def test_get_instance_by_name_default(self):
611 self.assertEqual(openstack_utils.
612 get_instance_by_name(self.nova_client,
616 @mock.patch('functest.utils.openstack_utils.logger.error')
617 def test_get_instance_by_name_exception(self, mock_logger_error):
618 self.assertEqual(openstack_utils.
619 get_instance_by_name(Exception,
622 self.assertTrue(mock_logger_error.called)
624 def test_get_flavor_id_default(self):
625 self.assertEqual(openstack_utils.
626 get_flavor_id(self.nova_client,
630 def test_get_flavor_id_by_ram_range_default(self):
631 self.assertEqual(openstack_utils.
632 get_flavor_id_by_ram_range(self.nova_client,
636 def test_get_aggregates_default(self):
637 self.assertEqual(openstack_utils.
638 get_aggregates(self.nova_client),
641 @mock.patch('functest.utils.openstack_utils.logger.error')
642 def test_get_aggregates_exception(self, mock_logger_error):
643 self.assertEqual(openstack_utils.
644 get_aggregates(Exception),
646 self.assertTrue(mock_logger_error.called)
648 def test_get_aggregate_id_default(self):
649 with mock.patch('functest.utils.openstack_utils.get_aggregates',
650 return_value=[self.aggregate]):
651 self.assertEqual(openstack_utils.
652 get_aggregate_id(self.nova_client,
656 @mock.patch('functest.utils.openstack_utils.logger.error')
657 def test_get_aggregate_id_exception(self, mock_logger_error):
658 self.assertEqual(openstack_utils.
659 get_aggregate_id(Exception,
662 self.assertTrue(mock_logger_error.called)
664 def test_get_availability_zone_names_default(self):
665 with mock.patch('functest.utils.openstack_utils'
666 '.get_availability_zones',
667 return_value=[self.availability_zone]):
668 self.assertEqual(openstack_utils.
669 get_availability_zone_names(self.nova_client),
672 @mock.patch('functest.utils.openstack_utils.logger.error')
673 def test_get_availability_zone_names_exception(self, mock_logger_error):
674 self.assertEqual(openstack_utils.
675 get_availability_zone_names(Exception),
677 self.assertTrue(mock_logger_error.called)
679 def test_get_availability_zones_default(self):
680 self.assertEqual(openstack_utils.
681 get_availability_zones(self.nova_client),
682 [self.availability_zone])
684 @mock.patch('functest.utils.openstack_utils.logger.error')
685 def test_get_availability_zones_exception(self, mock_logger_error):
686 self.assertEqual(openstack_utils.
687 get_availability_zones(Exception),
689 self.assertTrue(mock_logger_error.called)
691 def test_get_floating_ips_default(self):
692 self.assertEqual(openstack_utils.
693 get_floating_ips(self.nova_client),
696 @mock.patch('functest.utils.openstack_utils.logger.error')
697 def test_get_floating_ips_exception(self, mock_logger_error):
698 self.assertEqual(openstack_utils.
699 get_floating_ips(Exception),
701 self.assertTrue(mock_logger_error.called)
703 def test_get_hypervisors_default(self):
704 self.assertEqual(openstack_utils.
705 get_hypervisors(self.nova_client),
708 @mock.patch('functest.utils.openstack_utils.logger.error')
709 def test_get_hypervisors_exception(self, mock_logger_error):
710 self.assertEqual(openstack_utils.
711 get_hypervisors(Exception),
713 self.assertTrue(mock_logger_error.called)
715 def test_create_aggregate_default(self):
716 self.assertTrue(openstack_utils.
717 create_aggregate(self.nova_client,
721 @mock.patch('functest.utils.openstack_utils.logger.error')
722 def test_create_aggregate_exception(self, mock_logger_error):
723 self.assertEqual(openstack_utils.
724 create_aggregate(Exception,
728 self.assertTrue(mock_logger_error.called)
730 def test_add_host_to_aggregate_default(self):
731 with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
732 self.assertTrue(openstack_utils.
733 add_host_to_aggregate(self.nova_client,
737 @mock.patch('functest.utils.openstack_utils.logger.error')
738 def test_add_host_to_aggregate_exception(self, mock_logger_error):
739 self.assertEqual(openstack_utils.
740 add_host_to_aggregate(Exception,
744 self.assertTrue(mock_logger_error.called)
746 def test_create_aggregate_with_host_default(self):
747 with mock.patch('functest.utils.openstack_utils.create_aggregate'), \
748 mock.patch('functest.utils.openstack_utils.'
749 'add_host_to_aggregate'):
750 self.assertTrue(openstack_utils.
751 create_aggregate_with_host(self.nova_client,
756 @mock.patch('functest.utils.openstack_utils.logger.error')
757 def test_create_aggregate_with_host_exception(self, mock_logger_error):
758 with mock.patch('functest.utils.openstack_utils.create_aggregate',
759 side_effect=Exception):
760 self.assertEqual(openstack_utils.
761 create_aggregate_with_host(Exception,
766 self.assertTrue(mock_logger_error.called)
768 def test_create_instance_default(self):
769 with mock.patch('functest.utils.openstack_utils.'
771 return_value=self.nova_client):
772 self.assertEqual(openstack_utils.
773 create_instance('test_flavor',
778 @mock.patch('functest.utils.openstack_utils.logger.error')
779 def test_create_instance_exception(self, mock_logger_error):
780 with mock.patch('functest.utils.openstack_utils.'
782 return_value=self.nova_client):
783 self.nova_client.flavors.find.side_effect = Exception
784 self.assertEqual(openstack_utils.
785 create_instance('test_flavor',
789 self.assertTrue(mock_logger_error)
791 def test_create_floating_ip_default(self):
792 with mock.patch('functest.utils.openstack_utils.'
793 'get_external_net_id',
794 return_value='external_net_id'):
795 exp_resp = {'fip_addr': 'test_ip', 'fip_id': 'fip_id'}
796 self.assertEqual(openstack_utils.
797 create_floating_ip(self.neutron_client),
800 @mock.patch('functest.utils.openstack_utils.logger.error')
801 def test_create_floating_ip_exception(self, mock_logger_error):
802 with mock.patch('functest.utils.openstack_utils.'
803 'get_external_net_id',
804 return_value='external_net_id'):
805 self.assertEqual(openstack_utils.
806 create_floating_ip(Exception),
808 self.assertTrue(mock_logger_error)
810 def test_add_floating_ip_default(self):
811 with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
812 self.assertTrue(openstack_utils.
813 add_floating_ip(self.nova_client,
815 'test_floatingip_addr'))
817 @mock.patch('functest.utils.openstack_utils.logger.error')
818 def test_add_floating_ip_exception(self, mock_logger_error):
819 self.assertFalse(openstack_utils.
820 add_floating_ip(Exception,
822 'test_floatingip_addr'))
823 self.assertTrue(mock_logger_error.called)
825 def test_delete_instance_default(self):
826 self.assertTrue(openstack_utils.
827 delete_instance(self.nova_client,
830 @mock.patch('functest.utils.openstack_utils.logger.error')
831 def test_delete_instance_exception(self, mock_logger_error):
832 self.assertFalse(openstack_utils.
833 delete_instance(Exception,
835 self.assertTrue(mock_logger_error.called)
837 def test_delete_floating_ip_default(self):
838 self.assertTrue(openstack_utils.
839 delete_floating_ip(self.nova_client,
842 @mock.patch('functest.utils.openstack_utils.logger.error')
843 def test_delete_floating_ip_exception(self, mock_logger_error):
844 self.assertFalse(openstack_utils.
845 delete_floating_ip(Exception,
847 self.assertTrue(mock_logger_error.called)
849 def test_remove_host_from_aggregate_default(self):
850 with mock.patch('functest.utils.openstack_utils.'
852 self.assertTrue(openstack_utils.
853 remove_host_from_aggregate(self.nova_client,
857 @mock.patch('functest.utils.openstack_utils.logger.error')
858 def test_remove_host_from_aggregate_exception(self, mock_logger_error):
859 with mock.patch('functest.utils.openstack_utils.'
860 'get_aggregate_id', side_effect=Exception):
861 self.assertFalse(openstack_utils.
862 remove_host_from_aggregate(self.nova_client,
865 self.assertTrue(mock_logger_error.called)
867 def test_remove_hosts_from_aggregate_default(self):
868 with mock.patch('functest.utils.openstack_utils.'
869 'get_aggregate_id'), \
870 mock.patch('functest.utils.openstack_utils.'
871 'remove_host_from_aggregate',
874 openstack_utils.remove_hosts_from_aggregate(self.nova_client,
876 mock_method.assert_any_call(self.nova_client,
880 def test_delete_aggregate_default(self):
881 with mock.patch('functest.utils.openstack_utils.'
882 'remove_hosts_from_aggregate'):
883 self.assertTrue(openstack_utils.
884 delete_aggregate(self.nova_client,
887 @mock.patch('functest.utils.openstack_utils.logger.error')
888 def test_delete_aggregate_exception(self, mock_logger_error):
889 with mock.patch('functest.utils.openstack_utils.'
890 'remove_hosts_from_aggregate', side_effect=Exception):
891 self.assertFalse(openstack_utils.
892 delete_aggregate(self.nova_client,
894 self.assertTrue(mock_logger_error.called)
896 def test_get_network_list_default(self):
897 self.assertEqual(openstack_utils.
898 get_network_list(self.neutron_client),
901 def test_get_network_list_missing_network(self):
902 self.assertEqual(openstack_utils.
903 get_network_list(self.empty_client),
906 def test_get_router_list_default(self):
907 self.assertEqual(openstack_utils.
908 get_router_list(self.neutron_client),
911 def test_get_router_list_missing_router(self):
912 self.assertEqual(openstack_utils.
913 get_router_list(self.empty_client),
916 def test_get_port_list_default(self):
917 self.assertEqual(openstack_utils.
918 get_port_list(self.neutron_client),
921 def test_get_port_list_missing_port(self):
922 self.assertEqual(openstack_utils.
923 get_port_list(self.empty_client),
926 def test_get_network_id_default(self):
927 self.assertEqual(openstack_utils.
928 get_network_id(self.neutron_client,
932 def test_get_subnet_id_default(self):
933 self.assertEqual(openstack_utils.
934 get_subnet_id(self.neutron_client,
938 def test_get_router_id_default(self):
939 self.assertEqual(openstack_utils.
940 get_router_id(self.neutron_client,
944 def test_get_private_net_default(self):
945 self.assertEqual(openstack_utils.
946 get_private_net(self.neutron_client),
949 def test_get_private_net_missing_net(self):
950 self.assertEqual(openstack_utils.
951 get_private_net(self.empty_client),
954 def test_get_external_net_default(self):
955 self.assertEqual(openstack_utils.
956 get_external_net(self.neutron_client),
959 def test_get_external_net_missing_net(self):
960 self.assertEqual(openstack_utils.
961 get_external_net(self.empty_client),
964 def test_get_external_net_id_default(self):
965 self.assertEqual(openstack_utils.
966 get_external_net_id(self.neutron_client),
969 def test_get_external_net_id_missing_net(self):
970 self.assertEqual(openstack_utils.
971 get_external_net_id(self.empty_client),
974 def test_check_neutron_net_default(self):
975 self.assertTrue(openstack_utils.
976 check_neutron_net(self.neutron_client,
979 def test_check_neutron_net_missing_net(self):
980 self.assertFalse(openstack_utils.
981 check_neutron_net(self.empty_client,
984 def test_create_neutron_net_default(self):
985 self.assertEqual(openstack_utils.
986 create_neutron_net(self.neutron_client,
990 @mock.patch('functest.utils.openstack_utils.logger.error')
991 def test_create_neutron_net_exception(self, mock_logger_error):
992 self.assertEqual(openstack_utils.
993 create_neutron_net(Exception,
996 self.assertTrue(mock_logger_error.called)
998 def test_create_neutron_subnet_default(self):
999 self.assertEqual(openstack_utils.
1000 create_neutron_subnet(self.neutron_client,
1006 @mock.patch('functest.utils.openstack_utils.logger.error')
1007 def test_create_neutron_subnet_exception(self, mock_logger_error):
1008 self.assertEqual(openstack_utils.
1009 create_neutron_subnet(Exception,
1014 self.assertTrue(mock_logger_error.called)
1016 def test_create_neutron_router_default(self):
1017 self.assertEqual(openstack_utils.
1018 create_neutron_router(self.neutron_client,
1022 @mock.patch('functest.utils.openstack_utils.logger.error')
1023 def test_create_neutron_router_exception(self, mock_logger_error):
1024 self.assertEqual(openstack_utils.
1025 create_neutron_router(Exception,
1028 self.assertTrue(mock_logger_error.called)
1030 def test_create_neutron_port_default(self):
1031 self.assertEqual(openstack_utils.
1032 create_neutron_port(self.neutron_client,
1038 @mock.patch('functest.utils.openstack_utils.logger.error')
1039 def test_create_neutron_port_exception(self, mock_logger_error):
1040 self.assertEqual(openstack_utils.
1041 create_neutron_port(Exception,
1046 self.assertTrue(mock_logger_error.called)
1048 def test_update_neutron_net_default(self):
1049 self.assertTrue(openstack_utils.
1050 update_neutron_net(self.neutron_client,
1053 @mock.patch('functest.utils.openstack_utils.logger.error')
1054 def test_update_neutron_net_exception(self, mock_logger_error):
1055 self.assertFalse(openstack_utils.
1056 update_neutron_net(Exception,
1058 self.assertTrue(mock_logger_error.called)
1060 def test_update_neutron_port_default(self):
1061 self.assertEqual(openstack_utils.
1062 update_neutron_port(self.neutron_client,
1067 @mock.patch('functest.utils.openstack_utils.logger.error')
1068 def test_update_neutron_port_exception(self, mock_logger_error):
1069 self.assertEqual(openstack_utils.
1070 update_neutron_port(Exception,
1074 self.assertTrue(mock_logger_error.called)
1076 def test_add_interface_router_default(self):
1077 self.assertTrue(openstack_utils.
1078 add_interface_router(self.neutron_client,
1082 @mock.patch('functest.utils.openstack_utils.logger.error')
1083 def test_add_interface_router_exception(self, mock_logger_error):
1084 self.assertFalse(openstack_utils.
1085 add_interface_router(Exception,
1088 self.assertTrue(mock_logger_error.called)
1090 def test_add_gateway_router_default(self):
1091 with mock.patch('functest.utils.openstack_utils.'
1092 'get_external_net_id',
1093 return_value='network_id'):
1094 self.assertTrue(openstack_utils.
1095 add_gateway_router(self.neutron_client,
1098 @mock.patch('functest.utils.openstack_utils.logger.error')
1099 def test_add_gateway_router_exception(self, mock_logger_error):
1100 with mock.patch('functest.utils.openstack_utils.'
1101 'get_external_net_id',
1102 return_value='network_id'):
1103 self.assertFalse(openstack_utils.
1104 add_gateway_router(Exception,
1106 self.assertTrue(mock_logger_error.called)
1108 def test_delete_neutron_net_default(self):
1109 self.assertTrue(openstack_utils.
1110 delete_neutron_net(self.neutron_client,
1113 @mock.patch('functest.utils.openstack_utils.logger.error')
1114 def test_delete_neutron_net_exception(self, mock_logger_error):
1115 self.assertFalse(openstack_utils.
1116 delete_neutron_net(Exception,
1118 self.assertTrue(mock_logger_error.called)
1120 def test_delete_neutron_subnet_default(self):
1121 self.assertTrue(openstack_utils.
1122 delete_neutron_subnet(self.neutron_client,
1125 @mock.patch('functest.utils.openstack_utils.logger.error')
1126 def test_delete_neutron_subnet_exception(self, mock_logger_error):
1127 self.assertFalse(openstack_utils.
1128 delete_neutron_subnet(Exception,
1130 self.assertTrue(mock_logger_error.called)
1132 def test_delete_neutron_router_default(self):
1133 self.assertTrue(openstack_utils.
1134 delete_neutron_router(self.neutron_client,
1137 @mock.patch('functest.utils.openstack_utils.logger.error')
1138 def test_delete_neutron_router_exception(self, mock_logger_error):
1139 self.assertFalse(openstack_utils.
1140 delete_neutron_router(Exception,
1142 self.assertTrue(mock_logger_error.called)
1144 def test_delete_neutron_port_default(self):
1145 self.assertTrue(openstack_utils.
1146 delete_neutron_port(self.neutron_client,
1149 @mock.patch('functest.utils.openstack_utils.logger.error')
1150 def test_delete_neutron_port_exception(self, mock_logger_error):
1151 self.assertFalse(openstack_utils.
1152 delete_neutron_port(Exception,
1154 self.assertTrue(mock_logger_error.called)
1156 def test_remove_interface_router_default(self):
1157 self.assertTrue(openstack_utils.
1158 remove_interface_router(self.neutron_client,
1162 @mock.patch('functest.utils.openstack_utils.logger.error')
1163 def test_remove_interface_router_exception(self, mock_logger_error):
1164 self.assertFalse(openstack_utils.
1165 remove_interface_router(Exception,
1168 self.assertTrue(mock_logger_error.called)
1170 def test_remove_gateway_router_default(self):
1171 self.assertTrue(openstack_utils.
1172 remove_gateway_router(self.neutron_client,
1175 @mock.patch('functest.utils.openstack_utils.logger.error')
1176 def test_remove_gateway_router_exception(self, mock_logger_error):
1177 self.assertFalse(openstack_utils.
1178 remove_gateway_router(Exception,
1180 self.assertTrue(mock_logger_error.called)
1182 def test_create_bgpvpn(self):
1183 self.assertEqual(openstack_utils.
1184 create_bgpvpn(self.neutron_client),
1187 def test_create_network_association(self):
1188 self.assertEqual(openstack_utils.
1189 create_network_association(self.neutron_client,
1194 def test_create_router_association(self):
1195 self.assertEqual(openstack_utils.
1196 create_router_association(self.neutron_client,
1201 def test_update_bgpvpn(self):
1202 self.assertEqual(openstack_utils.
1203 update_bgpvpn(self.neutron_client,
1207 def test_delete_bgpvpn(self):
1208 self.assertEqual(openstack_utils.
1209 delete_bgpvpn(self.neutron_client,
1213 def test_get_bgpvpn(self):
1214 self.assertEqual(openstack_utils.
1215 get_bgpvpn(self.neutron_client,
1219 def test_get_bgpvpn_routers(self):
1220 with mock.patch('functest.utils.openstack_utils.'
1222 return_value={'bgpvpn':
1223 {'routers': [self.router]}}):
1224 self.assertEqual(openstack_utils.
1225 get_bgpvpn_routers(self.neutron_client,
1229 def test_get_security_groups_default(self):
1230 self.assertEqual(openstack_utils.
1231 get_security_groups(self.neutron_client),
1234 @mock.patch('functest.utils.openstack_utils.logger.error')
1235 def test_get_security_groups_exception(self, mock_logger_error):
1236 self.assertEqual(openstack_utils.
1237 get_security_groups(Exception),
1239 self.assertTrue(mock_logger_error.called)
1241 def test_get_security_group_id_default(self):
1242 with mock.patch('functest.utils.openstack_utils.'
1243 'get_security_groups',
1244 return_value=[self.sec_group]):
1245 self.assertEqual(openstack_utils.
1246 get_security_group_id(self.neutron_client,
1250 def test_create_security_group_default(self):
1251 self.assertEqual(openstack_utils.
1252 create_security_group(self.neutron_client,
1257 @mock.patch('functest.utils.openstack_utils.logger.error')
1258 def test_create_security_group_exception(self, mock_logger_error):
1259 self.assertEqual(openstack_utils.
1260 create_security_group(Exception,
1264 self.assertTrue(mock_logger_error.called)
1266 def test_create_secgroup_rule_default(self):
1267 self.assertTrue(openstack_utils.
1268 create_secgroup_rule(self.neutron_client,
1274 self.assertTrue(openstack_utils.
1275 create_secgroup_rule(self.neutron_client,
1280 @mock.patch('functest.utils.openstack_utils.logger.error')
1281 def test_create_secgroup_rule_invalid_port_range(self, mock_logger_error):
1282 self.assertFalse(openstack_utils.
1283 create_secgroup_rule(self.neutron_client,
1289 @mock.patch('functest.utils.openstack_utils.logger.error')
1290 def test_create_secgroup_rule_exception(self, mock_logger_error):
1291 self.assertFalse(openstack_utils.
1292 create_secgroup_rule(Exception,
1297 @mock.patch('functest.utils.openstack_utils.logger.info')
1298 def test_create_security_group_full_default(self, mock_logger_info):
1299 with mock.patch('functest.utils.openstack_utils.'
1300 'get_security_group_id',
1301 return_value='sg_id'):
1302 self.assertEqual(openstack_utils.
1303 create_security_group_full(self.neutron_client,
1307 self.assertTrue(mock_logger_info)
1309 @mock.patch('functest.utils.openstack_utils.logger.info')
1310 @mock.patch('functest.utils.openstack_utils.logger.error')
1311 def test_create_security_group_full_sec_group_fail(self,
1314 with mock.patch('functest.utils.openstack_utils.'
1315 'get_security_group_id',
1317 mock.patch('functest.utils.openstack_utils.'
1318 'create_security_group',
1319 return_value=False):
1320 self.assertEqual(openstack_utils.
1321 create_security_group_full(self.neutron_client,
1325 self.assertTrue(mock_logger_error)
1326 self.assertTrue(mock_logger_info)
1328 @mock.patch('functest.utils.openstack_utils.logger.debug')
1329 @mock.patch('functest.utils.openstack_utils.logger.info')
1330 @mock.patch('functest.utils.openstack_utils.logger.error')
1331 def test_create_security_group_full_secgroup_rule_fail(self,
1335 with mock.patch('functest.utils.openstack_utils.'
1336 'get_security_group_id',
1338 mock.patch('functest.utils.openstack_utils.'
1339 'create_security_group',
1340 return_value={'id': 'sg_id',
1341 'name': 'sg_name'}), \
1342 mock.patch('functest.utils.openstack_utils.'
1343 'create_secgroup_rule',
1344 return_value=False):
1345 self.assertEqual(openstack_utils.
1346 create_security_group_full(self.neutron_client,
1350 self.assertTrue(mock_logger_error)
1351 self.assertTrue(mock_logger_info)
1352 self.assertTrue(mock_logger_debug)
1354 def test_add_secgroup_to_instance_default(self):
1355 self.assertTrue(openstack_utils.
1356 add_secgroup_to_instance(self.nova_client,
1360 @mock.patch('functest.utils.openstack_utils.logger.error')
1361 def test_add_secgroup_to_instance_exception(self, mock_logger_error):
1362 self.assertFalse(openstack_utils.
1363 add_secgroup_to_instance(Exception,
1366 self.assertTrue(mock_logger_error.called)
1368 def test_update_sg_quota_default(self):
1369 self.assertTrue(openstack_utils.
1370 update_sg_quota(self.neutron_client,
1375 @mock.patch('functest.utils.openstack_utils.logger.error')
1376 def test_update_sg_quota_exception(self, mock_logger_error):
1377 self.assertFalse(openstack_utils.
1378 update_sg_quota(Exception,
1382 self.assertTrue(mock_logger_error.called)
1384 def test_delete_security_group_default(self):
1385 self.assertTrue(openstack_utils.
1386 delete_security_group(self.neutron_client,
1389 @mock.patch('functest.utils.openstack_utils.logger.error')
1390 def test_delete_security_group_exception(self, mock_logger_error):
1391 self.assertFalse(openstack_utils.
1392 delete_security_group(Exception,
1394 self.assertTrue(mock_logger_error.called)
1396 def test_get_images_default(self):
1397 self.assertEqual(openstack_utils.
1398 get_images(self.nova_client),
1401 @mock.patch('functest.utils.openstack_utils.logger.error')
1402 def test_get_images_exception(self, mock_logger_error):
1403 self.assertEqual(openstack_utils.
1404 get_images(Exception),
1406 self.assertTrue(mock_logger_error.called)
1408 def test_get_image_id_default(self):
1409 self.assertEqual(openstack_utils.
1410 get_image_id(self.glance_client,
1414 # create_glance_image, get_or_create_image
1415 @mock.patch('functest.utils.openstack_utils.logger.error')
1416 def test_create_glance_image_file_present(self, mock_logger_error):
1417 with mock.patch('functest.utils.openstack_utils.'
1419 return_value=False):
1420 self.assertEqual(openstack_utils.
1421 create_glance_image(self.glance_client,
1425 self.assertTrue(mock_logger_error.called)
1427 @mock.patch('functest.utils.openstack_utils.logger.info')
1428 def test_create_glance_image_already_exist(self, mock_logger_info):
1429 with mock.patch('functest.utils.openstack_utils.'
1431 return_value=True), \
1432 mock.patch('functest.utils.openstack_utils.get_image_id',
1433 return_value='image_id'):
1434 self.assertEqual(openstack_utils.
1435 create_glance_image(self.glance_client,
1439 self.assertTrue(mock_logger_info.called)
1441 @mock.patch('functest.utils.openstack_utils.logger.info')
1442 def test_create_glance_image_default(self, mock_logger_info):
1443 with mock.patch('functest.utils.openstack_utils.'
1445 return_value=True), \
1446 mock.patch('functest.utils.openstack_utils.get_image_id',
1448 mock.patch('__builtin__.open',
1449 mock.mock_open(read_data='1')) as m:
1450 self.assertEqual(openstack_utils.
1451 create_glance_image(self.glance_client,
1455 m.assert_called_once_with('file_path')
1456 self.assertTrue(mock_logger_info.called)
1458 @mock.patch('functest.utils.openstack_utils.logger.error')
1459 def test_create_glance_image_exception(self, mock_logger_error):
1460 with mock.patch('functest.utils.openstack_utils.'
1462 return_value=True), \
1463 mock.patch('functest.utils.openstack_utils.get_image_id',
1464 side_effect=Exception):
1465 self.assertEqual(openstack_utils.
1466 create_glance_image(self.glance_client,
1470 self.assertTrue(mock_logger_error.called)
1472 def test_delete_glance_image_default(self):
1473 self.assertTrue(openstack_utils.
1474 delete_glance_image(self.nova_client,
1477 @mock.patch('functest.utils.openstack_utils.logger.error')
1478 def test_delete_glance_image_exception(self, mock_logger_error):
1479 self.assertFalse(openstack_utils.
1480 delete_glance_image(Exception,
1482 self.assertTrue(mock_logger_error.called)
1484 def test_get_volumes_default(self):
1485 self.assertEqual(openstack_utils.
1486 get_volumes(self.cinder_client),
1489 @mock.patch('functest.utils.openstack_utils.logger.error')
1490 def test_get_volumes_exception(self, mock_logger_error):
1491 self.assertEqual(openstack_utils.
1492 get_volumes(Exception),
1494 self.assertTrue(mock_logger_error.called)
1496 def test_list_volume_types_default_private(self):
1497 self.assertEqual(openstack_utils.
1498 list_volume_types(self.cinder_client,
1501 [self.volume_types[1]])
1503 def test_list_volume_types_default_public(self):
1504 self.assertEqual(openstack_utils.
1505 list_volume_types(self.cinder_client,
1508 [self.volume_types[0]])
1510 @mock.patch('functest.utils.openstack_utils.logger.error')
1511 def test_list_volume_types_exception(self, mock_logger_error):
1512 self.assertEqual(openstack_utils.
1513 list_volume_types(Exception),
1515 self.assertTrue(mock_logger_error.called)
1517 def test_create_volume_type_default(self):
1518 self.assertEqual(openstack_utils.
1519 create_volume_type(self.cinder_client,
1520 'test_volume_type'),
1521 self.volume_types[0])
1523 @mock.patch('functest.utils.openstack_utils.logger.error')
1524 def test_create_volume_type_exception(self, mock_logger_error):
1525 self.assertEqual(openstack_utils.
1526 create_volume_type(Exception,
1527 'test_volume_type'),
1529 self.assertTrue(mock_logger_error.called)
1531 def test_update_cinder_quota_default(self):
1532 self.assertTrue(openstack_utils.
1533 update_cinder_quota(self.cinder_client,
1539 @mock.patch('functest.utils.openstack_utils.logger.error')
1540 def test_update_cinder_quota_exception(self, mock_logger_error):
1541 self.assertFalse(openstack_utils.
1542 update_cinder_quota(Exception,
1547 self.assertTrue(mock_logger_error.called)
1549 def test_delete_volume_default(self):
1550 self.assertTrue(openstack_utils.
1551 delete_volume(self.cinder_client,
1555 self.assertTrue(openstack_utils.
1556 delete_volume(self.cinder_client,
1560 @mock.patch('functest.utils.openstack_utils.logger.error')
1561 def test_delete_volume_exception(self, mock_logger_error):
1562 self.assertFalse(openstack_utils.
1563 delete_volume(Exception,
1566 self.assertTrue(mock_logger_error.called)
1568 def test_delete_volume_type_default(self):
1569 self.assertTrue(openstack_utils.
1570 delete_volume_type(self.cinder_client,
1571 self.volume_types[0]))
1573 @mock.patch('functest.utils.openstack_utils.logger.error')
1574 def test_delete_volume_type_exception(self, mock_logger_error):
1575 self.assertFalse(openstack_utils.
1576 delete_volume_type(Exception,
1577 self.volume_types[0]))
1578 self.assertTrue(mock_logger_error.called)
1580 def test_get_tenants_default(self):
1581 with mock.patch('functest.utils.openstack_utils.'
1582 'is_keystone_v3', return_value=True):
1583 self.assertEqual(openstack_utils.
1584 get_tenants(self.keystone_client),
1586 with mock.patch('functest.utils.openstack_utils.'
1587 'is_keystone_v3', return_value=False):
1588 self.assertEqual(openstack_utils.
1589 get_tenants(self.keystone_client),
1592 @mock.patch('functest.utils.openstack_utils.logger.error')
1593 def test_get_tenants_exception(self, mock_logger_error):
1594 self.assertEqual(openstack_utils.
1595 get_tenants(Exception),
1597 self.assertTrue(mock_logger_error.called)
1599 def test_get_users_default(self):
1600 self.assertEqual(openstack_utils.
1601 get_users(self.keystone_client),
1604 @mock.patch('functest.utils.openstack_utils.logger.error')
1605 def test_get_users_exception(self, mock_logger_error):
1606 self.assertEqual(openstack_utils.
1607 get_users(Exception),
1609 self.assertTrue(mock_logger_error.called)
1611 def test_get_tenant_id_default(self):
1612 self.assertEqual(openstack_utils.
1613 get_tenant_id(self.keystone_client,
1617 def test_get_user_id_default(self):
1618 self.assertEqual(openstack_utils.
1619 get_user_id(self.keystone_client,
1623 def test_get_role_id_default(self):
1624 self.assertEqual(openstack_utils.
1625 get_role_id(self.keystone_client,
1629 def test_create_tenant_default(self):
1630 with mock.patch('functest.utils.openstack_utils.'
1631 'is_keystone_v3', return_value=True):
1632 self.assertEqual(openstack_utils.
1633 create_tenant(self.keystone_client,
1637 with mock.patch('functest.utils.openstack_utils.'
1638 'is_keystone_v3', return_value=False):
1639 self.assertEqual(openstack_utils.
1640 create_tenant(self.keystone_client,
1645 @mock.patch('functest.utils.openstack_utils.logger.error')
1646 def test_create_tenant_exception(self, mock_logger_error):
1647 self.assertEqual(openstack_utils.
1648 create_tenant(Exception,
1652 self.assertTrue(mock_logger_error.called)
1654 def test_create_user_default(self):
1655 with mock.patch('functest.utils.openstack_utils.'
1656 'is_keystone_v3', return_value=True):
1657 self.assertEqual(openstack_utils.
1658 create_user(self.keystone_client,
1664 with mock.patch('functest.utils.openstack_utils.'
1665 'is_keystone_v3', return_value=False):
1666 self.assertEqual(openstack_utils.
1667 create_user(self.keystone_client,
1674 @mock.patch('functest.utils.openstack_utils.logger.error')
1675 def test_create_user_exception(self, mock_logger_error):
1676 self.assertEqual(openstack_utils.
1677 create_user(Exception,
1683 self.assertTrue(mock_logger_error.called)
1685 def test_add_role_user_default(self):
1686 with mock.patch('functest.utils.openstack_utils.'
1687 'is_keystone_v3', return_value=True):
1688 self.assertTrue(openstack_utils.
1689 add_role_user(self.keystone_client,
1694 with mock.patch('functest.utils.openstack_utils.'
1695 'is_keystone_v3', return_value=False):
1696 self.assertTrue(openstack_utils.
1697 add_role_user(self.keystone_client,
1702 @mock.patch('functest.utils.openstack_utils.logger.error')
1703 def test_add_role_user_exception(self, mock_logger_error):
1704 self.assertFalse(openstack_utils.
1705 add_role_user(Exception,
1709 self.assertTrue(mock_logger_error.called)
1711 def test_delete_tenant_default(self):
1712 with mock.patch('functest.utils.openstack_utils.'
1713 'is_keystone_v3', return_value=True):
1714 self.assertTrue(openstack_utils.
1715 delete_tenant(self.keystone_client,
1718 with mock.patch('functest.utils.openstack_utils.'
1719 'is_keystone_v3', return_value=False):
1720 self.assertTrue(openstack_utils.
1721 delete_tenant(self.keystone_client,
1724 @mock.patch('functest.utils.openstack_utils.logger.error')
1725 def test_delete_tenant_exception(self, mock_logger_error):
1726 self.assertFalse(openstack_utils.
1727 delete_tenant(Exception,
1729 self.assertTrue(mock_logger_error.called)
1731 def test_delete_user_default(self):
1732 self.assertTrue(openstack_utils.
1733 delete_user(self.keystone_client,
1736 @mock.patch('functest.utils.openstack_utils.logger.error')
1737 def test_delete_user_exception(self, mock_logger_error):
1738 self.assertFalse(openstack_utils.
1739 delete_user(Exception,
1741 self.assertTrue(mock_logger_error.called)
1743 def test_get_resource_default(self):
1744 with mock.patch('functest.utils.openstack_utils.'
1745 'is_keystone_v3', return_value=True):
1746 self.assertEqual(openstack_utils.
1747 get_resource(self.heat_client,
1752 @mock.patch('functest.utils.openstack_utils.logger.error')
1753 def test_get_resource_exception(self, mock_logger_error):
1754 self.assertEqual(openstack_utils.
1755 get_resource(Exception,
1759 self.assertTrue(mock_logger_error.called)
1762 if __name__ == "__main__":
1763 unittest.main(verbosity=2)