3 # All rights reserved. This program and the accompanying materials
4 # are made available under the terms of the Apache License, Version 2.0
5 # which accompanies this distribution, and is available at
6 # http://www.apache.org/licenses/LICENSE-2.0
15 from functest.utils import openstack_utils
18 class OSUtilsTesting(unittest.TestCase):
20 def _get_env_cred_dict(self, os_prefix=''):
21 return {'OS_USERNAME': os_prefix + 'username',
22 'OS_PASSWORD': os_prefix + 'password',
23 'OS_AUTH_URL': os_prefix + 'auth_url',
24 'OS_TENANT_NAME': os_prefix + 'tenant_name',
25 'OS_USER_DOMAIN_NAME': os_prefix + 'user_domain_name',
26 'OS_PROJECT_DOMAIN_NAME': os_prefix + 'project_domain_name',
27 'OS_PROJECT_NAME': os_prefix + 'project_name',
28 'OS_ENDPOINT_TYPE': os_prefix + 'endpoint_type',
29 'OS_REGION_NAME': os_prefix + 'region_name',
30 'OS_CACERT': os_prefix + 'https_cacert'}
32 def _get_os_env_vars(self):
33 return {'username': 'test_username', 'password': 'test_password',
34 'auth_url': 'test_auth_url', 'tenant_name': 'test_tenant_name',
35 'user_domain_name': 'test_user_domain_name',
36 'project_domain_name': 'test_project_domain_name',
37 'project_name': 'test_project_name',
38 'endpoint_type': 'test_endpoint_type',
39 'region_name': 'test_region_name',
40 'https_cacert': 'test_https_cacert'}
43 self.env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD']
44 self.tenant_name = 'test_tenant_name'
45 self.env_cred_dict = self._get_env_cred_dict()
46 self.os_environs = self._get_env_cred_dict(os_prefix='test_')
47 self.os_env_vars = self._get_os_env_vars()
49 mock_obj = mock.Mock()
50 attrs = {'name': 'test_flavor',
53 mock_obj.configure_mock(**attrs)
54 self.flavor = mock_obj
56 mock_obj = mock.Mock()
57 attrs = {'name': 'test_aggregate',
59 'hosts': ['host_name']}
60 mock_obj.configure_mock(**attrs)
61 self.aggregate = mock_obj
63 mock_obj = mock.Mock()
64 attrs = {'id': 'instance_id',
65 'name': 'test_instance',
67 mock_obj.configure_mock(**attrs)
68 self.instance = mock_obj
70 mock_obj = mock.Mock()
71 attrs = {'id': 'azone_id',
72 'zoneName': 'test_azone',
74 mock_obj.configure_mock(**attrs)
75 self.availability_zone = mock_obj
77 mock_obj = mock.Mock()
78 attrs = {'id': 'floating_id',
79 'zoneName': 'test_floating_ip',
81 mock_obj.configure_mock(**attrs)
82 self.floating_ip = mock_obj
84 mock_obj = mock.Mock()
85 attrs = {'id': 'hypervisor_id',
86 'hypervisor_hostname': 'test_hostname',
88 mock_obj.configure_mock(**attrs)
89 self.hypervisor = mock_obj
91 mock_obj = mock.Mock()
92 attrs = {'id': 'image_id',
94 mock_obj.configure_mock(**attrs)
97 mock_obj = mock.Mock()
98 self.mock_return = mock_obj
100 self.nova_client = mock.Mock()
101 attrs = {'servers.list.return_value': [self.instance],
102 'servers.get.return_value': self.instance,
103 'servers.find.return_value': self.instance,
104 'servers.create.return_value': self.instance,
105 'flavors.list.return_value': [self.flavor],
106 'flavors.find.return_value': self.flavor,
107 'servers.add_floating_ip.return_value': mock.Mock(),
108 'servers.force_delete.return_value': mock.Mock(),
109 'aggregates.list.return_value': [self.aggregate],
110 'aggregates.add_host.return_value': mock.Mock(),
111 'aggregates.remove_host.return_value': mock.Mock(),
112 'aggregates.get.return_value': self.aggregate,
113 'aggregates.delete.return_value': mock.Mock(),
114 'availability_zones.list.return_value':
115 [self.availability_zone],
116 'floating_ips.list.return_value': [self.floating_ip],
117 'floating_ips.delete.return_value': mock.Mock(),
118 'hypervisors.list.return_value': [self.hypervisor],
119 'create.return_value': mock.Mock(),
120 'add_security_group.return_value': mock.Mock(),
121 'images.list.return_value': [self.image],
122 'images.delete.return_value': mock.Mock(),
124 self.nova_client.configure_mock(**attrs)
126 self.glance_client = mock.Mock()
127 attrs = {'images.list.return_value': [self.image],
128 'images.create.return_value': self.image,
129 'images.upload.return_value': mock.Mock()}
130 self.glance_client.configure_mock(**attrs)
132 mock_obj = mock.Mock()
133 attrs = {'id': 'volume_id',
134 'name': 'test_volume'}
135 mock_obj.configure_mock(**attrs)
136 self.volume = mock_obj
138 mock_obj = mock.Mock()
139 attrs = {'id': 'volume_type_id',
140 'name': 'test_volume_type',
142 mock_obj.configure_mock(**attrs)
143 self.volume_types = [mock_obj]
145 mock_obj = mock.Mock()
146 attrs = {'id': 'volume_type_id',
147 'name': 'test_volume_type',
149 mock_obj.configure_mock(**attrs)
150 self.volume_types.append(mock_obj)
152 self.cinder_client = mock.Mock()
153 attrs = {'volumes.list.return_value': [self.volume],
154 'volume_types.list.return_value': self.volume_types,
155 'volume_types.create.return_value': self.volume_types[0],
156 'volume_types.delete.return_value': mock.Mock(),
157 'quotas.update.return_value': mock.Mock(),
158 'volumes.detach.return_value': mock.Mock(),
159 'volumes.force_delete.return_value': mock.Mock(),
160 'volumes.delete.return_value': mock.Mock()
162 self.cinder_client.configure_mock(**attrs)
164 self.resource = mock.Mock()
165 attrs = {'id': 'resource_test_id',
166 'name': 'resource_test_name'
169 self.heat_client = mock.Mock()
170 attrs = {'resources.get.return_value': self.resource}
171 self.heat_client.configure_mock(**attrs)
173 mock_obj = mock.Mock()
174 attrs = {'id': 'tenant_id',
175 'name': 'test_tenant'}
176 mock_obj.configure_mock(**attrs)
177 self.tenant = mock_obj
179 mock_obj = mock.Mock()
180 attrs = {'id': 'user_id',
182 mock_obj.configure_mock(**attrs)
185 mock_obj = mock.Mock()
186 attrs = {'id': 'role_id',
188 mock_obj.configure_mock(**attrs)
191 self.keystone_client = mock.Mock()
192 attrs = {'projects.list.return_value': [self.tenant],
193 'tenants.list.return_value': [self.tenant],
194 'users.list.return_value': [self.user],
195 'roles.list.return_value': [self.role],
196 'projects.create.return_value': self.tenant,
197 'tenants.create.return_value': self.tenant,
198 'users.create.return_value': self.user,
199 'roles.grant.return_value': mock.Mock(),
200 'roles.add_user_role.return_value': mock.Mock(),
201 'projects.delete.return_value': mock.Mock(),
202 'tenants.delete.return_value': mock.Mock(),
203 'users.delete.return_value': mock.Mock(),
205 self.keystone_client.configure_mock(**attrs)
207 self.router = {'id': 'router_id',
208 'name': 'test_router'}
210 self.subnet = {'id': 'subnet_id',
211 'name': 'test_subnet'}
213 self.networks = [{'id': 'network_id',
214 'name': 'test_network',
215 'router:external': False,
217 'subnets': [self.subnet]},
218 {'id': 'network_id1',
219 'name': 'test_network1',
220 'router:external': True,
222 'subnets': [self.subnet]}]
224 self.port = {'id': 'port_id',
227 self.sec_group = {'id': 'sec_group_id',
228 'name': 'test_sec_group'}
230 self.sec_group_rule = {'id': 'sec_group_rule_id',
231 'direction': 'direction',
232 'protocol': 'protocol',
233 'port_range_max': 'port_max',
234 'security_group_id': self.sec_group['id'],
235 'port_range_min': 'port_min'}
236 self.neutron_floatingip = {'id': 'fip_id',
237 'floating_ip_address': 'test_ip'}
238 self.neutron_client = mock.Mock()
239 attrs = {'list_networks.return_value': {'networks': self.networks},
240 'list_routers.return_value': {'routers': [self.router]},
241 'list_ports.return_value': {'ports': [self.port]},
242 'list_subnets.return_value': {'subnets': [self.subnet]},
243 'create_network.return_value': {'network': self.networks[0]},
244 'create_subnet.return_value': {'subnets': [self.subnet]},
245 'create_router.return_value': {'router': self.router},
246 'create_port.return_value': {'port': self.port},
247 'create_floatingip.return_value': {'floatingip':
248 self.neutron_floatingip},
249 'update_network.return_value': mock.Mock(),
250 'update_port.return_value': {'port': self.port},
251 'add_interface_router.return_value': mock.Mock(),
252 'add_gateway_router.return_value': mock.Mock(),
253 'delete_network.return_value': mock.Mock(),
254 'delete_subnet.return_value': mock.Mock(),
255 'delete_router.return_value': mock.Mock(),
256 'delete_port.return_value': mock.Mock(),
257 'remove_interface_router.return_value': mock.Mock(),
258 'remove_gateway_router.return_value': mock.Mock(),
259 'create_bgpvpn.return_value': self.mock_return,
260 'create_network_association.return_value': self.mock_return,
261 'create_router_association.return_value': self.mock_return,
262 'update_bgpvpn.return_value': self.mock_return,
263 'delete_bgpvpn.return_value': self.mock_return,
264 'show_bgpvpn.return_value': self.mock_return,
265 'list_security_groups.return_value': {'security_groups':
267 'list_security_group_rules.'
268 'return_value': {'security_group_rules':
269 [self.sec_group_rule]},
270 'create_security_group_rule.return_value': mock.Mock(),
271 'create_security_group.return_value': {'security_group':
273 'update_quota.return_value': mock.Mock(),
274 'delete_security_group.return_value': mock.Mock()
276 self.neutron_client.configure_mock(**attrs)
278 self.empty_client = mock.Mock()
279 attrs = {'list_networks.return_value': {'networks': []},
280 'list_routers.return_value': {'routers': []},
281 'list_ports.return_value': {'ports': []},
282 'list_subnets.return_value': {'subnets': []}}
283 self.empty_client.configure_mock(**attrs)
285 @mock.patch('functest.utils.openstack_utils.os.getenv',
287 def test_is_keystone_v3_missing_identity(self, mock_os_getenv):
288 self.assertEqual(openstack_utils.is_keystone_v3(), False)
290 @mock.patch('functest.utils.openstack_utils.os.getenv',
292 def test_is_keystone_v3_default(self, mock_os_getenv):
293 self.assertEqual(openstack_utils.is_keystone_v3(), True)
295 @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
297 def test_get_rc_env_vars_missing_identity(self, mock_get_rc_env):
298 exp_resp = self.env_vars
299 exp_resp.extend(['OS_TENANT_NAME'])
300 self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
302 @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
304 def test_get_rc_env_vars_default(self, mock_get_rc_env):
305 exp_resp = self.env_vars
306 exp_resp.extend(['OS_PROJECT_NAME',
307 'OS_USER_DOMAIN_NAME',
308 'OS_PROJECT_DOMAIN_NAME'])
309 self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
311 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
312 def test_check_credentials_missing_env(self, mock_get_rc_env):
313 exp_resp = self.env_vars
314 exp_resp.extend(['OS_TENANT_NAME'])
315 mock_get_rc_env.return_value = exp_resp
316 with mock.patch.dict('functest.utils.openstack_utils.os.environ', {},
318 self.assertEqual(openstack_utils.check_credentials(), False)
320 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
321 def test_check_credentials_default(self, mock_get_rc_env):
322 exp_resp = ['OS_TENANT_NAME']
323 mock_get_rc_env.return_value = exp_resp
324 with mock.patch.dict('functest.utils.openstack_utils.os.environ',
325 {'OS_TENANT_NAME': self.tenant_name},
327 self.assertEqual(openstack_utils.check_credentials(), True)
329 def test_get_env_cred_dict(self):
330 self.assertDictEqual(openstack_utils.get_env_cred_dict(),
333 @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
334 def test_get_credentials_default(self, mock_get_rc_env):
335 mock_get_rc_env.return_value = self.env_cred_dict.keys()
336 with mock.patch.dict('functest.utils.openstack_utils.os.environ',
339 self.assertDictEqual(openstack_utils.get_credentials(),
342 def _get_credentials_missing_env(self, var):
343 dic = copy.deepcopy(self.os_environs)
345 with mock.patch('functest.utils.openstack_utils.get_rc_env_vars',
346 return_value=self.env_cred_dict.keys()), \
347 mock.patch.dict('functest.utils.openstack_utils.os.environ',
350 self.assertRaises(openstack_utils.MissingEnvVar,
351 lambda: openstack_utils.get_credentials())
353 def test_get_credentials_missing_username(self):
354 self._get_credentials_missing_env('OS_USERNAME')
356 def test_get_credentials_missing_password(self):
357 self._get_credentials_missing_env('OS_PASSWORD')
359 def test_get_credentials_missing_auth_url(self):
360 self._get_credentials_missing_env('OS_AUTH_URL')
362 def test_get_credentials_missing_tenantname(self):
363 self._get_credentials_missing_env('OS_TENANT_NAME')
365 def test_get_credentials_missing_domainname(self):
366 self._get_credentials_missing_env('OS_USER_DOMAIN_NAME')
368 def test_get_credentials_missing_projectname(self):
369 self._get_credentials_missing_env('OS_PROJECT_NAME')
371 def test_get_credentials_missing_endpoint_type(self):
372 self._get_credentials_missing_env('OS_ENDPOINT_TYPE')
374 def _test_source_credentials(self, msg, key='OS_TENANT_NAME',
381 with mock.patch('__builtin__.open', mock.mock_open(read_data=msg),
383 m.return_value.__iter__ = lambda self: iter(self.readline, '')
384 openstack_utils.source_credentials(f)
385 m.assert_called_once_with(f, 'r')
386 self.assertEqual(os.environ[key], value)
388 def test_source_credentials(self):
389 self._test_source_credentials('OS_TENANT_NAME=admin')
390 self._test_source_credentials('OS_TENANT_NAME= admin')
391 self._test_source_credentials('OS_TENANT_NAME = admin')
392 self._test_source_credentials('OS_TENANT_NAME = "admin"')
393 self._test_source_credentials('export OS_TENANT_NAME=admin')
394 self._test_source_credentials('export OS_TENANT_NAME =admin')
395 self._test_source_credentials('export OS_TENANT_NAME = admin')
396 self._test_source_credentials('export OS_TENANT_NAME = "admin"')
397 self._test_source_credentials('OS_TENANT_NAME', value='')
398 self._test_source_credentials('export OS_TENANT_NAME', value='')
399 # This test will fail as soon as rc_file is fixed
400 self._test_source_credentials(
401 'export "\'OS_TENANT_NAME\'" = "\'admin\'"')
403 @mock.patch('functest.utils.openstack_utils.os.getenv',
405 def test_get_keystone_client_version_missing_env(self, mock_os_getenv):
406 self.assertEqual(openstack_utils.get_keystone_client_version(),
407 openstack_utils.DEFAULT_API_VERSION)
409 @mock.patch('functest.utils.openstack_utils.logger.info')
410 @mock.patch('functest.utils.openstack_utils.os.getenv',
412 def test_get_keystone_client_version_default(self, mock_os_getenv,
414 self.assertEqual(openstack_utils.get_keystone_client_version(),
416 mock_logger_info.assert_called_once_with("OS_IDENTITY_API_VERSION is "
417 "set in env as '%s'", '3')
419 @mock.patch('functest.utils.openstack_utils.get_session')
420 @mock.patch('functest.utils.openstack_utils.keystoneclient.Client')
421 @mock.patch('functest.utils.openstack_utils.get_keystone_client_version',
423 @mock.patch('functest.utils.openstack_utils.os.getenv',
424 return_value='public')
425 def test_get_keystone_client_with_interface(self, mock_os_getenv,
426 mock_keystoneclient_version,
429 mock_keystone_obj = mock.Mock()
430 mock_session_obj = mock.Mock()
431 mock_key_client.return_value = mock_keystone_obj
432 mock_get_session.return_value = mock_session_obj
433 self.assertEqual(openstack_utils.get_keystone_client(),
435 mock_key_client.assert_called_once_with('3',
436 session=mock_session_obj,
439 @mock.patch('functest.utils.openstack_utils.get_session')
440 @mock.patch('functest.utils.openstack_utils.keystoneclient.Client')
441 @mock.patch('functest.utils.openstack_utils.get_keystone_client_version',
443 @mock.patch('functest.utils.openstack_utils.os.getenv',
444 return_value='admin')
445 def test_get_keystone_client_no_interface(self, mock_os_getenv,
446 mock_keystoneclient_version,
449 mock_keystone_obj = mock.Mock()
450 mock_session_obj = mock.Mock()
451 mock_key_client.return_value = mock_keystone_obj
452 mock_get_session.return_value = mock_session_obj
453 self.assertEqual(openstack_utils.get_keystone_client(),
455 mock_key_client.assert_called_once_with('3',
456 session=mock_session_obj,
459 @mock.patch('functest.utils.openstack_utils.os.getenv',
461 def test_get_nova_client_version_missing_env(self, mock_os_getenv):
462 self.assertEqual(openstack_utils.get_nova_client_version(),
463 openstack_utils.DEFAULT_API_VERSION)
465 @mock.patch('functest.utils.openstack_utils.logger.info')
466 @mock.patch('functest.utils.openstack_utils.os.getenv',
468 def test_get_nova_client_version_default(self, mock_os_getenv,
470 self.assertEqual(openstack_utils.get_nova_client_version(),
472 mock_logger_info.assert_called_once_with("OS_COMPUTE_API_VERSION is "
473 "set in env as '%s'", '3')
475 def test_get_nova_client(self):
476 mock_nova_obj = mock.Mock()
477 mock_session_obj = mock.Mock()
478 with mock.patch('functest.utils.openstack_utils'
479 '.get_nova_client_version', return_value='3'), \
480 mock.patch('functest.utils.openstack_utils'
481 '.novaclient.Client',
482 return_value=mock_nova_obj) \
483 as mock_nova_client, \
484 mock.patch('functest.utils.openstack_utils.get_session',
485 return_value=mock_session_obj):
486 self.assertEqual(openstack_utils.get_nova_client(),
488 mock_nova_client.assert_called_once_with('3',
489 session=mock_session_obj)
491 @mock.patch('functest.utils.openstack_utils.os.getenv',
493 def test_get_cinder_client_version_missing_env(self, mock_os_getenv):
494 self.assertEqual(openstack_utils.get_cinder_client_version(),
495 openstack_utils.DEFAULT_API_VERSION)
497 @mock.patch('functest.utils.openstack_utils.logger.info')
498 @mock.patch('functest.utils.openstack_utils.os.getenv',
500 def test_get_cinder_client_version_default(self, mock_os_getenv,
502 self.assertEqual(openstack_utils.get_cinder_client_version(),
504 mock_logger_info.assert_called_once_with("OS_VOLUME_API_VERSION is "
505 "set in env as '%s'", '3')
507 def test_get_cinder_client(self):
508 mock_cinder_obj = mock.Mock()
509 mock_session_obj = mock.Mock()
510 with mock.patch('functest.utils.openstack_utils'
511 '.get_cinder_client_version', return_value='3'), \
512 mock.patch('functest.utils.openstack_utils'
513 '.cinderclient.Client',
514 return_value=mock_cinder_obj) \
515 as mock_cind_client, \
516 mock.patch('functest.utils.openstack_utils.get_session',
517 return_value=mock_session_obj):
518 self.assertEqual(openstack_utils.get_cinder_client(),
520 mock_cind_client.assert_called_once_with('3',
521 session=mock_session_obj)
523 @mock.patch('functest.utils.openstack_utils.os.getenv',
525 def test_get_neutron_client_version_missing_env(self, mock_os_getenv):
526 self.assertEqual(openstack_utils.get_neutron_client_version(),
527 openstack_utils.DEFAULT_API_VERSION)
529 @mock.patch('functest.utils.openstack_utils.logger.info')
530 @mock.patch('functest.utils.openstack_utils.os.getenv',
532 def test_get_neutron_client_version_default(self, mock_os_getenv,
534 self.assertEqual(openstack_utils.get_neutron_client_version(),
536 mock_logger_info.assert_called_once_with("OS_NETWORK_API_VERSION is "
537 "set in env as '%s'", '3')
539 def test_get_neutron_client(self):
540 mock_neutron_obj = mock.Mock()
541 mock_session_obj = mock.Mock()
542 with mock.patch('functest.utils.openstack_utils'
543 '.get_neutron_client_version', return_value='3'), \
544 mock.patch('functest.utils.openstack_utils'
545 '.neutronclient.Client',
546 return_value=mock_neutron_obj) \
547 as mock_neut_client, \
548 mock.patch('functest.utils.openstack_utils.get_session',
549 return_value=mock_session_obj):
550 self.assertEqual(openstack_utils.get_neutron_client(),
552 mock_neut_client.assert_called_once_with('3',
553 session=mock_session_obj)
555 @mock.patch('functest.utils.openstack_utils.os.getenv',
557 def test_get_glance_client_version_missing_env(self, mock_os_getenv):
558 self.assertEqual(openstack_utils.get_glance_client_version(),
559 openstack_utils.DEFAULT_API_VERSION)
561 @mock.patch('functest.utils.openstack_utils.logger.info')
562 @mock.patch('functest.utils.openstack_utils.os.getenv',
564 def test_get_glance_client_version_default(self, mock_os_getenv,
566 self.assertEqual(openstack_utils.get_glance_client_version(),
568 mock_logger_info.assert_called_once_with("OS_IMAGE_API_VERSION is "
569 "set in env as '%s'", '3')
571 def test_get_glance_client(self):
572 mock_glance_obj = mock.Mock()
573 mock_session_obj = mock.Mock()
574 with mock.patch('functest.utils.openstack_utils'
575 '.get_glance_client_version', return_value='3'), \
576 mock.patch('functest.utils.openstack_utils'
577 '.glanceclient.Client',
578 return_value=mock_glance_obj) \
579 as mock_glan_client, \
580 mock.patch('functest.utils.openstack_utils.get_session',
581 return_value=mock_session_obj):
582 self.assertEqual(openstack_utils.get_glance_client(),
584 mock_glan_client.assert_called_once_with('3',
585 session=mock_session_obj)
587 @mock.patch('functest.utils.openstack_utils.os.getenv',
589 def test_get_heat_client_version_missing_env(self, mock_os_getenv):
590 self.assertEqual(openstack_utils.get_heat_client_version(),
591 openstack_utils.DEFAULT_HEAT_API_VERSION)
593 @mock.patch('functest.utils.openstack_utils.logger.info')
594 @mock.patch('functest.utils.openstack_utils.os.getenv', return_value='1')
595 def test_get_heat_client_version_default(self, mock_os_getenv,
597 self.assertEqual(openstack_utils.get_heat_client_version(), '1')
598 mock_logger_info.assert_called_once_with(
599 "OS_ORCHESTRATION_API_VERSION is set in env as '%s'", '1')
601 def test_get_heat_client(self):
602 mock_heat_obj = mock.Mock()
603 mock_session_obj = mock.Mock()
604 with mock.patch('functest.utils.openstack_utils'
605 '.get_heat_client_version', return_value='1'), \
606 mock.patch('functest.utils.openstack_utils'
607 '.heatclient.Client',
608 return_value=mock_heat_obj) \
609 as mock_heat_client, \
610 mock.patch('functest.utils.openstack_utils.get_session',
611 return_value=mock_session_obj):
612 self.assertEqual(openstack_utils.get_heat_client(),
614 mock_heat_client.assert_called_once_with('1',
615 session=mock_session_obj)
617 def test_get_instances_default(self):
618 self.assertEqual(openstack_utils.get_instances(self.nova_client),
621 @mock.patch('functest.utils.openstack_utils.logger.error')
622 def test_get_instances_exception(self, mock_logger_error):
623 self.assertEqual(openstack_utils.
624 get_instances(Exception),
626 self.assertTrue(mock_logger_error.called)
628 def test_get_instance_status_default(self):
629 self.assertEqual(openstack_utils.get_instance_status(self.nova_client,
633 @mock.patch('functest.utils.openstack_utils.logger.error')
634 def test_get_instance_status_exception(self, mock_logger_error):
635 self.assertEqual(openstack_utils.
636 get_instance_status(Exception,
639 self.assertTrue(mock_logger_error.called)
641 def test_get_instance_by_name_default(self):
642 self.assertEqual(openstack_utils.
643 get_instance_by_name(self.nova_client,
647 @mock.patch('functest.utils.openstack_utils.logger.error')
648 def test_get_instance_by_name_exception(self, mock_logger_error):
649 self.assertEqual(openstack_utils.
650 get_instance_by_name(Exception,
653 self.assertTrue(mock_logger_error.called)
655 def test_get_flavor_id_default(self):
656 self.assertEqual(openstack_utils.
657 get_flavor_id(self.nova_client,
661 def test_get_flavor_id_by_ram_range_default(self):
662 self.assertEqual(openstack_utils.
663 get_flavor_id_by_ram_range(self.nova_client,
667 def test_get_aggregates_default(self):
668 self.assertEqual(openstack_utils.
669 get_aggregates(self.nova_client),
672 @mock.patch('functest.utils.openstack_utils.logger.error')
673 def test_get_aggregates_exception(self, mock_logger_error):
674 self.assertEqual(openstack_utils.
675 get_aggregates(Exception),
677 self.assertTrue(mock_logger_error.called)
679 def test_get_aggregate_id_default(self):
680 with mock.patch('functest.utils.openstack_utils.get_aggregates',
681 return_value=[self.aggregate]):
682 self.assertEqual(openstack_utils.
683 get_aggregate_id(self.nova_client,
687 @mock.patch('functest.utils.openstack_utils.logger.error')
688 def test_get_aggregate_id_exception(self, mock_logger_error):
689 self.assertEqual(openstack_utils.
690 get_aggregate_id(Exception,
693 self.assertTrue(mock_logger_error.called)
695 def test_get_availability_zone_names_default(self):
696 with mock.patch('functest.utils.openstack_utils'
697 '.get_availability_zones',
698 return_value=[self.availability_zone]):
699 self.assertEqual(openstack_utils.
700 get_availability_zone_names(self.nova_client),
703 @mock.patch('functest.utils.openstack_utils.logger.error')
704 def test_get_availability_zone_names_exception(self, mock_logger_error):
705 self.assertEqual(openstack_utils.
706 get_availability_zone_names(Exception),
708 self.assertTrue(mock_logger_error.called)
710 def test_get_availability_zones_default(self):
711 self.assertEqual(openstack_utils.
712 get_availability_zones(self.nova_client),
713 [self.availability_zone])
715 @mock.patch('functest.utils.openstack_utils.logger.error')
716 def test_get_availability_zones_exception(self, mock_logger_error):
717 self.assertEqual(openstack_utils.
718 get_availability_zones(Exception),
720 self.assertTrue(mock_logger_error.called)
722 def test_get_floating_ips_default(self):
723 self.assertEqual(openstack_utils.
724 get_floating_ips(self.nova_client),
727 @mock.patch('functest.utils.openstack_utils.logger.error')
728 def test_get_floating_ips_exception(self, mock_logger_error):
729 self.assertEqual(openstack_utils.
730 get_floating_ips(Exception),
732 self.assertTrue(mock_logger_error.called)
734 def test_get_hypervisors_default(self):
735 self.assertEqual(openstack_utils.
736 get_hypervisors(self.nova_client),
739 @mock.patch('functest.utils.openstack_utils.logger.error')
740 def test_get_hypervisors_exception(self, mock_logger_error):
741 self.assertEqual(openstack_utils.
742 get_hypervisors(Exception),
744 self.assertTrue(mock_logger_error.called)
746 def test_create_aggregate_default(self):
747 self.assertTrue(openstack_utils.
748 create_aggregate(self.nova_client,
752 @mock.patch('functest.utils.openstack_utils.logger.error')
753 def test_create_aggregate_exception(self, mock_logger_error):
754 self.assertEqual(openstack_utils.
755 create_aggregate(Exception,
759 self.assertTrue(mock_logger_error.called)
761 def test_add_host_to_aggregate_default(self):
762 with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
763 self.assertTrue(openstack_utils.
764 add_host_to_aggregate(self.nova_client,
768 @mock.patch('functest.utils.openstack_utils.logger.error')
769 def test_add_host_to_aggregate_exception(self, mock_logger_error):
770 self.assertEqual(openstack_utils.
771 add_host_to_aggregate(Exception,
775 self.assertTrue(mock_logger_error.called)
777 def test_create_aggregate_with_host_default(self):
778 with mock.patch('functest.utils.openstack_utils.create_aggregate'), \
779 mock.patch('functest.utils.openstack_utils.'
780 'add_host_to_aggregate'):
781 self.assertTrue(openstack_utils.
782 create_aggregate_with_host(self.nova_client,
787 @mock.patch('functest.utils.openstack_utils.logger.error')
788 def test_create_aggregate_with_host_exception(self, mock_logger_error):
789 with mock.patch('functest.utils.openstack_utils.create_aggregate',
790 side_effect=Exception):
791 self.assertEqual(openstack_utils.
792 create_aggregate_with_host(Exception,
797 self.assertTrue(mock_logger_error.called)
799 def test_create_instance_default(self):
800 with mock.patch('functest.utils.openstack_utils.'
802 return_value=self.nova_client):
803 self.assertEqual(openstack_utils.
804 create_instance('test_flavor',
809 @mock.patch('functest.utils.openstack_utils.logger.error')
810 def test_create_instance_exception(self, mock_logger_error):
811 with mock.patch('functest.utils.openstack_utils.'
813 return_value=self.nova_client):
814 self.nova_client.flavors.find.side_effect = Exception
815 self.assertEqual(openstack_utils.
816 create_instance('test_flavor',
820 self.assertTrue(mock_logger_error)
822 def test_create_floating_ip_default(self):
823 with mock.patch('functest.utils.openstack_utils.'
824 'get_external_net_id',
825 return_value='external_net_id'):
826 exp_resp = {'fip_addr': 'test_ip', 'fip_id': 'fip_id'}
827 self.assertEqual(openstack_utils.
828 create_floating_ip(self.neutron_client),
831 @mock.patch('functest.utils.openstack_utils.logger.error')
832 def test_create_floating_ip_exception(self, mock_logger_error):
833 with mock.patch('functest.utils.openstack_utils.'
834 'get_external_net_id',
835 return_value='external_net_id'):
836 self.assertEqual(openstack_utils.
837 create_floating_ip(Exception),
839 self.assertTrue(mock_logger_error)
841 def test_add_floating_ip_default(self):
842 with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
843 self.assertTrue(openstack_utils.
844 add_floating_ip(self.nova_client,
846 'test_floatingip_addr'))
848 @mock.patch('functest.utils.openstack_utils.logger.error')
849 def test_add_floating_ip_exception(self, mock_logger_error):
850 self.assertFalse(openstack_utils.
851 add_floating_ip(Exception,
853 'test_floatingip_addr'))
854 self.assertTrue(mock_logger_error.called)
856 def test_delete_instance_default(self):
857 self.assertTrue(openstack_utils.
858 delete_instance(self.nova_client,
861 @mock.patch('functest.utils.openstack_utils.logger.error')
862 def test_delete_instance_exception(self, mock_logger_error):
863 self.assertFalse(openstack_utils.
864 delete_instance(Exception,
866 self.assertTrue(mock_logger_error.called)
868 def test_delete_floating_ip_default(self):
869 self.assertTrue(openstack_utils.
870 delete_floating_ip(self.nova_client,
873 @mock.patch('functest.utils.openstack_utils.logger.error')
874 def test_delete_floating_ip_exception(self, mock_logger_error):
875 self.assertFalse(openstack_utils.
876 delete_floating_ip(Exception,
878 self.assertTrue(mock_logger_error.called)
880 def test_remove_host_from_aggregate_default(self):
881 with mock.patch('functest.utils.openstack_utils.'
883 self.assertTrue(openstack_utils.
884 remove_host_from_aggregate(self.nova_client,
888 @mock.patch('functest.utils.openstack_utils.logger.error')
889 def test_remove_host_from_aggregate_exception(self, mock_logger_error):
890 with mock.patch('functest.utils.openstack_utils.'
891 'get_aggregate_id', side_effect=Exception):
892 self.assertFalse(openstack_utils.
893 remove_host_from_aggregate(self.nova_client,
896 self.assertTrue(mock_logger_error.called)
898 def test_remove_hosts_from_aggregate_default(self):
899 with mock.patch('functest.utils.openstack_utils.'
900 'get_aggregate_id'), \
901 mock.patch('functest.utils.openstack_utils.'
902 'remove_host_from_aggregate',
905 openstack_utils.remove_hosts_from_aggregate(self.nova_client,
907 mock_method.assert_any_call(self.nova_client,
911 def test_delete_aggregate_default(self):
912 with mock.patch('functest.utils.openstack_utils.'
913 'remove_hosts_from_aggregate'):
914 self.assertTrue(openstack_utils.
915 delete_aggregate(self.nova_client,
918 @mock.patch('functest.utils.openstack_utils.logger.error')
919 def test_delete_aggregate_exception(self, mock_logger_error):
920 with mock.patch('functest.utils.openstack_utils.'
921 'remove_hosts_from_aggregate', side_effect=Exception):
922 self.assertFalse(openstack_utils.
923 delete_aggregate(self.nova_client,
925 self.assertTrue(mock_logger_error.called)
927 def test_get_network_list_default(self):
928 self.assertEqual(openstack_utils.
929 get_network_list(self.neutron_client),
932 def test_get_network_list_missing_network(self):
933 self.assertEqual(openstack_utils.
934 get_network_list(self.empty_client),
937 def test_get_router_list_default(self):
938 self.assertEqual(openstack_utils.
939 get_router_list(self.neutron_client),
942 def test_get_router_list_missing_router(self):
943 self.assertEqual(openstack_utils.
944 get_router_list(self.empty_client),
947 def test_get_port_list_default(self):
948 self.assertEqual(openstack_utils.
949 get_port_list(self.neutron_client),
952 def test_get_port_list_missing_port(self):
953 self.assertEqual(openstack_utils.
954 get_port_list(self.empty_client),
957 def test_get_network_id_default(self):
958 self.assertEqual(openstack_utils.
959 get_network_id(self.neutron_client,
963 def test_get_subnet_id_default(self):
964 self.assertEqual(openstack_utils.
965 get_subnet_id(self.neutron_client,
969 def test_get_router_id_default(self):
970 self.assertEqual(openstack_utils.
971 get_router_id(self.neutron_client,
975 def test_get_private_net_default(self):
976 self.assertEqual(openstack_utils.
977 get_private_net(self.neutron_client),
980 def test_get_private_net_missing_net(self):
981 self.assertEqual(openstack_utils.
982 get_private_net(self.empty_client),
985 def test_get_external_net_default(self):
986 self.assertEqual(openstack_utils.
987 get_external_net(self.neutron_client),
990 def test_get_external_net_missing_net(self):
991 self.assertEqual(openstack_utils.
992 get_external_net(self.empty_client),
995 def test_get_external_net_id_default(self):
996 self.assertEqual(openstack_utils.
997 get_external_net_id(self.neutron_client),
1000 def test_get_external_net_id_missing_net(self):
1001 self.assertEqual(openstack_utils.
1002 get_external_net_id(self.empty_client),
1005 def test_check_neutron_net_default(self):
1006 self.assertTrue(openstack_utils.
1007 check_neutron_net(self.neutron_client,
1010 def test_check_neutron_net_missing_net(self):
1011 self.assertFalse(openstack_utils.
1012 check_neutron_net(self.empty_client,
1015 def test_create_neutron_net_default(self):
1016 self.assertEqual(openstack_utils.
1017 create_neutron_net(self.neutron_client,
1021 @mock.patch('functest.utils.openstack_utils.logger.error')
1022 def test_create_neutron_net_exception(self, mock_logger_error):
1023 self.assertEqual(openstack_utils.
1024 create_neutron_net(Exception,
1027 self.assertTrue(mock_logger_error.called)
1029 def test_create_neutron_subnet_default(self):
1030 self.assertEqual(openstack_utils.
1031 create_neutron_subnet(self.neutron_client,
1037 @mock.patch('functest.utils.openstack_utils.logger.error')
1038 def test_create_neutron_subnet_exception(self, mock_logger_error):
1039 self.assertEqual(openstack_utils.
1040 create_neutron_subnet(Exception,
1045 self.assertTrue(mock_logger_error.called)
1047 def test_create_neutron_router_default(self):
1048 self.assertEqual(openstack_utils.
1049 create_neutron_router(self.neutron_client,
1053 @mock.patch('functest.utils.openstack_utils.logger.error')
1054 def test_create_neutron_router_exception(self, mock_logger_error):
1055 self.assertEqual(openstack_utils.
1056 create_neutron_router(Exception,
1059 self.assertTrue(mock_logger_error.called)
1061 def test_create_neutron_port_default(self):
1062 self.assertEqual(openstack_utils.
1063 create_neutron_port(self.neutron_client,
1069 @mock.patch('functest.utils.openstack_utils.logger.error')
1070 def test_create_neutron_port_exception(self, mock_logger_error):
1071 self.assertEqual(openstack_utils.
1072 create_neutron_port(Exception,
1077 self.assertTrue(mock_logger_error.called)
1079 def test_update_neutron_net_default(self):
1080 self.assertTrue(openstack_utils.
1081 update_neutron_net(self.neutron_client,
1084 @mock.patch('functest.utils.openstack_utils.logger.error')
1085 def test_update_neutron_net_exception(self, mock_logger_error):
1086 self.assertFalse(openstack_utils.
1087 update_neutron_net(Exception,
1089 self.assertTrue(mock_logger_error.called)
1091 def test_update_neutron_port_default(self):
1092 self.assertEqual(openstack_utils.
1093 update_neutron_port(self.neutron_client,
1098 @mock.patch('functest.utils.openstack_utils.logger.error')
1099 def test_update_neutron_port_exception(self, mock_logger_error):
1100 self.assertEqual(openstack_utils.
1101 update_neutron_port(Exception,
1105 self.assertTrue(mock_logger_error.called)
1107 def test_add_interface_router_default(self):
1108 self.assertTrue(openstack_utils.
1109 add_interface_router(self.neutron_client,
1113 @mock.patch('functest.utils.openstack_utils.logger.error')
1114 def test_add_interface_router_exception(self, mock_logger_error):
1115 self.assertFalse(openstack_utils.
1116 add_interface_router(Exception,
1119 self.assertTrue(mock_logger_error.called)
1121 def test_add_gateway_router_default(self):
1122 with mock.patch('functest.utils.openstack_utils.'
1123 'get_external_net_id',
1124 return_value='network_id'):
1125 self.assertTrue(openstack_utils.
1126 add_gateway_router(self.neutron_client,
1129 @mock.patch('functest.utils.openstack_utils.logger.error')
1130 def test_add_gateway_router_exception(self, mock_logger_error):
1131 with mock.patch('functest.utils.openstack_utils.'
1132 'get_external_net_id',
1133 return_value='network_id'):
1134 self.assertFalse(openstack_utils.
1135 add_gateway_router(Exception,
1137 self.assertTrue(mock_logger_error.called)
1139 def test_delete_neutron_net_default(self):
1140 self.assertTrue(openstack_utils.
1141 delete_neutron_net(self.neutron_client,
1144 @mock.patch('functest.utils.openstack_utils.logger.error')
1145 def test_delete_neutron_net_exception(self, mock_logger_error):
1146 self.assertFalse(openstack_utils.
1147 delete_neutron_net(Exception,
1149 self.assertTrue(mock_logger_error.called)
1151 def test_delete_neutron_subnet_default(self):
1152 self.assertTrue(openstack_utils.
1153 delete_neutron_subnet(self.neutron_client,
1156 @mock.patch('functest.utils.openstack_utils.logger.error')
1157 def test_delete_neutron_subnet_exception(self, mock_logger_error):
1158 self.assertFalse(openstack_utils.
1159 delete_neutron_subnet(Exception,
1161 self.assertTrue(mock_logger_error.called)
1163 def test_delete_neutron_router_default(self):
1164 self.assertTrue(openstack_utils.
1165 delete_neutron_router(self.neutron_client,
1168 @mock.patch('functest.utils.openstack_utils.logger.error')
1169 def test_delete_neutron_router_exception(self, mock_logger_error):
1170 self.assertFalse(openstack_utils.
1171 delete_neutron_router(Exception,
1173 self.assertTrue(mock_logger_error.called)
1175 def test_delete_neutron_port_default(self):
1176 self.assertTrue(openstack_utils.
1177 delete_neutron_port(self.neutron_client,
1180 @mock.patch('functest.utils.openstack_utils.logger.error')
1181 def test_delete_neutron_port_exception(self, mock_logger_error):
1182 self.assertFalse(openstack_utils.
1183 delete_neutron_port(Exception,
1185 self.assertTrue(mock_logger_error.called)
1187 def test_remove_interface_router_default(self):
1188 self.assertTrue(openstack_utils.
1189 remove_interface_router(self.neutron_client,
1193 @mock.patch('functest.utils.openstack_utils.logger.error')
1194 def test_remove_interface_router_exception(self, mock_logger_error):
1195 self.assertFalse(openstack_utils.
1196 remove_interface_router(Exception,
1199 self.assertTrue(mock_logger_error.called)
1201 def test_remove_gateway_router_default(self):
1202 self.assertTrue(openstack_utils.
1203 remove_gateway_router(self.neutron_client,
1206 @mock.patch('functest.utils.openstack_utils.logger.error')
1207 def test_remove_gateway_router_exception(self, mock_logger_error):
1208 self.assertFalse(openstack_utils.
1209 remove_gateway_router(Exception,
1211 self.assertTrue(mock_logger_error.called)
1213 def test_create_bgpvpn(self):
1214 self.assertEqual(openstack_utils.
1215 create_bgpvpn(self.neutron_client),
1218 def test_create_network_association(self):
1219 self.assertEqual(openstack_utils.
1220 create_network_association(self.neutron_client,
1225 def test_create_router_association(self):
1226 self.assertEqual(openstack_utils.
1227 create_router_association(self.neutron_client,
1232 def test_update_bgpvpn(self):
1233 self.assertEqual(openstack_utils.
1234 update_bgpvpn(self.neutron_client,
1238 def test_delete_bgpvpn(self):
1239 self.assertEqual(openstack_utils.
1240 delete_bgpvpn(self.neutron_client,
1244 def test_get_bgpvpn(self):
1245 self.assertEqual(openstack_utils.
1246 get_bgpvpn(self.neutron_client,
1250 def test_get_bgpvpn_routers(self):
1251 with mock.patch('functest.utils.openstack_utils.'
1253 return_value={'bgpvpn':
1254 {'routers': [self.router]}}):
1255 self.assertEqual(openstack_utils.
1256 get_bgpvpn_routers(self.neutron_client,
1260 def test_get_security_groups_default(self):
1261 self.assertEqual(openstack_utils.
1262 get_security_groups(self.neutron_client),
1265 @mock.patch('functest.utils.openstack_utils.logger.error')
1266 def test_get_security_groups_exception(self, mock_logger_error):
1267 self.assertEqual(openstack_utils.
1268 get_security_groups(Exception),
1270 self.assertTrue(mock_logger_error.called)
1272 def test_get_security_group_id_default(self):
1273 with mock.patch('functest.utils.openstack_utils.'
1274 'get_security_groups',
1275 return_value=[self.sec_group]):
1276 self.assertEqual(openstack_utils.
1277 get_security_group_id(self.neutron_client,
1281 def test_get_security_group_rules_default(self):
1282 self.assertEqual(openstack_utils.
1283 get_security_group_rules(self.neutron_client,
1284 self.sec_group['id']),
1285 [self.sec_group_rule])
1287 @mock.patch('functest.utils.openstack_utils.logger.error')
1288 def test_get_security_group_rules_exception(self, mock_logger_error):
1289 self.assertEqual(openstack_utils.
1290 get_security_group_rules(Exception,
1293 self.assertTrue(mock_logger_error.called)
1295 def test_check_security_group_rules_not_exists(self):
1296 self.assertEqual(openstack_utils.
1297 check_security_group_rules(self.neutron_client,
1305 def test_check_security_group_rules_exists(self):
1306 self.assertEqual(openstack_utils.
1307 check_security_group_rules(self.neutron_client,
1308 self.sec_group['id'],
1315 @mock.patch('functest.utils.openstack_utils.logger.error')
1316 def test_check_security_group_rules_exception(self, mock_logger_error):
1317 self.assertEqual(openstack_utils.
1318 check_security_group_rules(Exception,
1325 self.assertTrue(mock_logger_error.called)
1327 def test_create_security_group_default(self):
1328 self.assertEqual(openstack_utils.
1329 create_security_group(self.neutron_client,
1334 @mock.patch('functest.utils.openstack_utils.logger.error')
1335 def test_create_security_group_exception(self, mock_logger_error):
1336 self.assertEqual(openstack_utils.
1337 create_security_group(Exception,
1341 self.assertTrue(mock_logger_error.called)
1343 def test_create_secgroup_rule_default(self):
1344 self.assertTrue(openstack_utils.
1345 create_secgroup_rule(self.neutron_client,
1351 self.assertTrue(openstack_utils.
1352 create_secgroup_rule(self.neutron_client,
1357 @mock.patch('functest.utils.openstack_utils.logger.error')
1358 def test_create_secgroup_rule_invalid_port_range(self, mock_logger_error):
1359 self.assertFalse(openstack_utils.
1360 create_secgroup_rule(self.neutron_client,
1366 @mock.patch('functest.utils.openstack_utils.logger.error')
1367 def test_create_secgroup_rule_exception(self, mock_logger_error):
1368 self.assertFalse(openstack_utils.
1369 create_secgroup_rule(Exception,
1374 @mock.patch('functest.utils.openstack_utils.logger.info')
1375 def test_create_security_group_full_default(self, mock_logger_info):
1376 with mock.patch('functest.utils.openstack_utils.'
1377 'get_security_group_id',
1378 return_value='sg_id'):
1379 self.assertEqual(openstack_utils.
1380 create_security_group_full(self.neutron_client,
1384 self.assertTrue(mock_logger_info)
1386 @mock.patch('functest.utils.openstack_utils.logger.info')
1387 @mock.patch('functest.utils.openstack_utils.logger.error')
1388 def test_create_security_group_full_sec_group_fail(self,
1391 with mock.patch('functest.utils.openstack_utils.'
1392 'get_security_group_id',
1394 mock.patch('functest.utils.openstack_utils.'
1395 'create_security_group',
1396 return_value=False):
1397 self.assertEqual(openstack_utils.
1398 create_security_group_full(self.neutron_client,
1402 self.assertTrue(mock_logger_error)
1403 self.assertTrue(mock_logger_info)
1405 @mock.patch('functest.utils.openstack_utils.logger.debug')
1406 @mock.patch('functest.utils.openstack_utils.logger.info')
1407 @mock.patch('functest.utils.openstack_utils.logger.error')
1408 def test_create_security_group_full_secgroup_rule_fail(self,
1412 with mock.patch('functest.utils.openstack_utils.'
1413 'get_security_group_id',
1415 mock.patch('functest.utils.openstack_utils.'
1416 'create_security_group',
1417 return_value={'id': 'sg_id',
1418 'name': 'sg_name'}), \
1419 mock.patch('functest.utils.openstack_utils.'
1420 'create_secgroup_rule',
1421 return_value=False):
1422 self.assertEqual(openstack_utils.
1423 create_security_group_full(self.neutron_client,
1427 self.assertTrue(mock_logger_error)
1428 self.assertTrue(mock_logger_info)
1429 self.assertTrue(mock_logger_debug)
1431 def test_add_secgroup_to_instance_default(self):
1432 self.assertTrue(openstack_utils.
1433 add_secgroup_to_instance(self.nova_client,
1437 @mock.patch('functest.utils.openstack_utils.logger.error')
1438 def test_add_secgroup_to_instance_exception(self, mock_logger_error):
1439 self.assertFalse(openstack_utils.
1440 add_secgroup_to_instance(Exception,
1443 self.assertTrue(mock_logger_error.called)
1445 def test_update_sg_quota_default(self):
1446 self.assertTrue(openstack_utils.
1447 update_sg_quota(self.neutron_client,
1452 @mock.patch('functest.utils.openstack_utils.logger.error')
1453 def test_update_sg_quota_exception(self, mock_logger_error):
1454 self.assertFalse(openstack_utils.
1455 update_sg_quota(Exception,
1459 self.assertTrue(mock_logger_error.called)
1461 def test_delete_security_group_default(self):
1462 self.assertTrue(openstack_utils.
1463 delete_security_group(self.neutron_client,
1466 @mock.patch('functest.utils.openstack_utils.logger.error')
1467 def test_delete_security_group_exception(self, mock_logger_error):
1468 self.assertFalse(openstack_utils.
1469 delete_security_group(Exception,
1471 self.assertTrue(mock_logger_error.called)
1473 def test_get_images_default(self):
1474 self.assertEqual(openstack_utils.
1475 get_images(self.nova_client),
1478 @mock.patch('functest.utils.openstack_utils.logger.error')
1479 def test_get_images_exception(self, mock_logger_error):
1480 self.assertEqual(openstack_utils.
1481 get_images(Exception),
1483 self.assertTrue(mock_logger_error.called)
1485 def test_get_image_id_default(self):
1486 self.assertEqual(openstack_utils.
1487 get_image_id(self.glance_client,
1491 # create_glance_image, get_or_create_image
1492 @mock.patch('functest.utils.openstack_utils.logger.error')
1493 def test_create_glance_image_file_present(self, mock_logger_error):
1494 with mock.patch('functest.utils.openstack_utils.'
1496 return_value=False):
1497 self.assertEqual(openstack_utils.
1498 create_glance_image(self.glance_client,
1502 self.assertTrue(mock_logger_error.called)
1504 @mock.patch('functest.utils.openstack_utils.logger.info')
1505 def test_create_glance_image_already_exist(self, mock_logger_info):
1506 with mock.patch('functest.utils.openstack_utils.'
1508 return_value=True), \
1509 mock.patch('functest.utils.openstack_utils.get_image_id',
1510 return_value='image_id'):
1511 self.assertEqual(openstack_utils.
1512 create_glance_image(self.glance_client,
1516 self.assertTrue(mock_logger_info.called)
1518 @mock.patch('functest.utils.openstack_utils.logger.info')
1519 def test_create_glance_image_default(self, mock_logger_info):
1520 with mock.patch('functest.utils.openstack_utils.'
1522 return_value=True), \
1523 mock.patch('functest.utils.openstack_utils.get_image_id',
1525 mock.patch('__builtin__.open',
1526 mock.mock_open(read_data='1')) as m:
1527 self.assertEqual(openstack_utils.
1528 create_glance_image(self.glance_client,
1532 m.assert_called_once_with('file_path')
1533 self.assertTrue(mock_logger_info.called)
1535 @mock.patch('functest.utils.openstack_utils.logger.error')
1536 def test_create_glance_image_exception(self, mock_logger_error):
1537 with mock.patch('functest.utils.openstack_utils.'
1539 return_value=True), \
1540 mock.patch('functest.utils.openstack_utils.get_image_id',
1541 side_effect=Exception):
1542 self.assertEqual(openstack_utils.
1543 create_glance_image(self.glance_client,
1547 self.assertTrue(mock_logger_error.called)
1549 def test_delete_glance_image_default(self):
1550 self.assertTrue(openstack_utils.
1551 delete_glance_image(self.nova_client,
1554 @mock.patch('functest.utils.openstack_utils.logger.error')
1555 def test_delete_glance_image_exception(self, mock_logger_error):
1556 self.assertFalse(openstack_utils.
1557 delete_glance_image(Exception,
1559 self.assertTrue(mock_logger_error.called)
1561 def test_get_volumes_default(self):
1562 self.assertEqual(openstack_utils.
1563 get_volumes(self.cinder_client),
1566 @mock.patch('functest.utils.openstack_utils.logger.error')
1567 def test_get_volumes_exception(self, mock_logger_error):
1568 self.assertEqual(openstack_utils.
1569 get_volumes(Exception),
1571 self.assertTrue(mock_logger_error.called)
1573 def test_list_volume_types_default_private(self):
1574 self.assertEqual(openstack_utils.
1575 list_volume_types(self.cinder_client,
1578 [self.volume_types[1]])
1580 def test_list_volume_types_default_public(self):
1581 self.assertEqual(openstack_utils.
1582 list_volume_types(self.cinder_client,
1585 [self.volume_types[0]])
1587 @mock.patch('functest.utils.openstack_utils.logger.error')
1588 def test_list_volume_types_exception(self, mock_logger_error):
1589 self.assertEqual(openstack_utils.
1590 list_volume_types(Exception),
1592 self.assertTrue(mock_logger_error.called)
1594 def test_create_volume_type_default(self):
1595 self.assertEqual(openstack_utils.
1596 create_volume_type(self.cinder_client,
1597 'test_volume_type'),
1598 self.volume_types[0])
1600 @mock.patch('functest.utils.openstack_utils.logger.error')
1601 def test_create_volume_type_exception(self, mock_logger_error):
1602 self.assertEqual(openstack_utils.
1603 create_volume_type(Exception,
1604 'test_volume_type'),
1606 self.assertTrue(mock_logger_error.called)
1608 def test_update_cinder_quota_default(self):
1609 self.assertTrue(openstack_utils.
1610 update_cinder_quota(self.cinder_client,
1616 @mock.patch('functest.utils.openstack_utils.logger.error')
1617 def test_update_cinder_quota_exception(self, mock_logger_error):
1618 self.assertFalse(openstack_utils.
1619 update_cinder_quota(Exception,
1624 self.assertTrue(mock_logger_error.called)
1626 def test_delete_volume_default(self):
1627 self.assertTrue(openstack_utils.
1628 delete_volume(self.cinder_client,
1632 self.assertTrue(openstack_utils.
1633 delete_volume(self.cinder_client,
1637 @mock.patch('functest.utils.openstack_utils.logger.error')
1638 def test_delete_volume_exception(self, mock_logger_error):
1639 self.assertFalse(openstack_utils.
1640 delete_volume(Exception,
1643 self.assertTrue(mock_logger_error.called)
1645 def test_delete_volume_type_default(self):
1646 self.assertTrue(openstack_utils.
1647 delete_volume_type(self.cinder_client,
1648 self.volume_types[0]))
1650 @mock.patch('functest.utils.openstack_utils.logger.error')
1651 def test_delete_volume_type_exception(self, mock_logger_error):
1652 self.assertFalse(openstack_utils.
1653 delete_volume_type(Exception,
1654 self.volume_types[0]))
1655 self.assertTrue(mock_logger_error.called)
1657 def test_get_tenants_default(self):
1658 with mock.patch('functest.utils.openstack_utils.'
1659 'is_keystone_v3', return_value=True):
1660 self.assertEqual(openstack_utils.
1661 get_tenants(self.keystone_client),
1663 with mock.patch('functest.utils.openstack_utils.'
1664 'is_keystone_v3', return_value=False):
1665 self.assertEqual(openstack_utils.
1666 get_tenants(self.keystone_client),
1669 @mock.patch('functest.utils.openstack_utils.logger.error')
1670 def test_get_tenants_exception(self, mock_logger_error):
1671 self.assertEqual(openstack_utils.
1672 get_tenants(Exception),
1674 self.assertTrue(mock_logger_error.called)
1676 def test_get_users_default(self):
1677 self.assertEqual(openstack_utils.
1678 get_users(self.keystone_client),
1681 @mock.patch('functest.utils.openstack_utils.logger.error')
1682 def test_get_users_exception(self, mock_logger_error):
1683 self.assertEqual(openstack_utils.
1684 get_users(Exception),
1686 self.assertTrue(mock_logger_error.called)
1688 def test_get_tenant_id_default(self):
1689 self.assertEqual(openstack_utils.
1690 get_tenant_id(self.keystone_client,
1694 def test_get_user_id_default(self):
1695 self.assertEqual(openstack_utils.
1696 get_user_id(self.keystone_client,
1700 def test_get_role_id_default(self):
1701 self.assertEqual(openstack_utils.
1702 get_role_id(self.keystone_client,
1706 def test_create_tenant_default(self):
1707 with mock.patch('functest.utils.openstack_utils.'
1708 'is_keystone_v3', return_value=True):
1709 self.assertEqual(openstack_utils.
1710 create_tenant(self.keystone_client,
1714 with mock.patch('functest.utils.openstack_utils.'
1715 'is_keystone_v3', return_value=False):
1716 self.assertEqual(openstack_utils.
1717 create_tenant(self.keystone_client,
1722 @mock.patch('functest.utils.openstack_utils.logger.error')
1723 def test_create_tenant_exception(self, mock_logger_error):
1724 self.assertEqual(openstack_utils.
1725 create_tenant(Exception,
1729 self.assertTrue(mock_logger_error.called)
1731 def test_create_user_default(self):
1732 with mock.patch('functest.utils.openstack_utils.'
1733 'is_keystone_v3', return_value=True):
1734 self.assertEqual(openstack_utils.
1735 create_user(self.keystone_client,
1741 with mock.patch('functest.utils.openstack_utils.'
1742 'is_keystone_v3', return_value=False):
1743 self.assertEqual(openstack_utils.
1744 create_user(self.keystone_client,
1751 @mock.patch('functest.utils.openstack_utils.logger.error')
1752 def test_create_user_exception(self, mock_logger_error):
1753 self.assertEqual(openstack_utils.
1754 create_user(Exception,
1760 self.assertTrue(mock_logger_error.called)
1762 def test_add_role_user_default(self):
1763 with mock.patch('functest.utils.openstack_utils.'
1764 'is_keystone_v3', return_value=True):
1765 self.assertTrue(openstack_utils.
1766 add_role_user(self.keystone_client,
1771 with mock.patch('functest.utils.openstack_utils.'
1772 'is_keystone_v3', return_value=False):
1773 self.assertTrue(openstack_utils.
1774 add_role_user(self.keystone_client,
1779 @mock.patch('functest.utils.openstack_utils.logger.error')
1780 def test_add_role_user_exception(self, mock_logger_error):
1781 self.assertFalse(openstack_utils.
1782 add_role_user(Exception,
1786 self.assertTrue(mock_logger_error.called)
1788 def test_delete_tenant_default(self):
1789 with mock.patch('functest.utils.openstack_utils.'
1790 'is_keystone_v3', return_value=True):
1791 self.assertTrue(openstack_utils.
1792 delete_tenant(self.keystone_client,
1795 with mock.patch('functest.utils.openstack_utils.'
1796 'is_keystone_v3', return_value=False):
1797 self.assertTrue(openstack_utils.
1798 delete_tenant(self.keystone_client,
1801 @mock.patch('functest.utils.openstack_utils.logger.error')
1802 def test_delete_tenant_exception(self, mock_logger_error):
1803 self.assertFalse(openstack_utils.
1804 delete_tenant(Exception,
1806 self.assertTrue(mock_logger_error.called)
1808 def test_delete_user_default(self):
1809 self.assertTrue(openstack_utils.
1810 delete_user(self.keystone_client,
1813 @mock.patch('functest.utils.openstack_utils.logger.error')
1814 def test_delete_user_exception(self, mock_logger_error):
1815 self.assertFalse(openstack_utils.
1816 delete_user(Exception,
1818 self.assertTrue(mock_logger_error.called)
1820 def test_get_resource_default(self):
1821 with mock.patch('functest.utils.openstack_utils.'
1822 'is_keystone_v3', return_value=True):
1823 self.assertEqual(openstack_utils.
1824 get_resource(self.heat_client,
1829 @mock.patch('functest.utils.openstack_utils.logger.error')
1830 def test_get_resource_exception(self, mock_logger_error):
1831 self.assertEqual(openstack_utils.
1832 get_resource(Exception,
1836 self.assertTrue(mock_logger_error.called)
1839 if __name__ == "__main__":
1840 logging.disable(logging.CRITICAL)
1841 unittest.main(verbosity=2)