3 # All rights reserved. This program and the accompanying materials
4 # are made available under the terms of the Apache License, Version 2.0
5 # which accompanies this distribution, and is available at
6 # http://www.apache.org/licenses/LICENSE-2.0
12 from functest.utils import openstack_tacker
13 from functest.tests.unit import test_utils
16 class OSTackerTesting(unittest.TestCase):
18 logging.disable(logging.CRITICAL)
21 self.tacker_client = mock.Mock()
22 self.getresponse = {'vnfds': [{'id': 'test_id'}],
23 'vnfs': [{'id': 'test_id'}],
24 'sfcs': [{'id': 'test_id'}]}
25 self.vnfdlist = {'vnfds': [{'id': 'test_vnfd1'}, {'id': 'test_vnfd2'}]}
26 self.vnflist = {'vnfs': [{'id': 'test_vnf1'}, {'id': 'test_vnf2'}]}
27 self.sfclist = {'sfcs': [{'id': 'test_sfc1'}, {'id': 'test_sfc2'}]}
28 self.sfc_classifierlist = {'sfc_classifiers': [{'id': 'test_sfc_cl1'},
29 {'id': 'test_sfc_cl2'}]}
31 self.createvnfd = {"vnfd": {"attributes": {"vnfd": 'vnfd_body'}}}
32 self.createvnf = {"vnf": {"attributes": {"vnf": 'vnf_body'}}}
33 self.createsfc = {"sfc": {"attributes": {"sfc": 'sfc_body'}}}
34 self.createsfc_clf = {"sfc_classifier": {"attributes":
35 {"sfc_clf": 'sfc_clf_body'}}}
37 self.resource_type = 'vnfd'
38 self.resource_name = 'resource_name'
39 self.tosca_file = 'test_tosca_file'
40 self.vnfd = 'test_vnfd'
43 self.sfc_clf = 'test_sfc_clf'
47 'OS_USERNAME': 'username',
48 'OS_PASSWORD': 'password',
49 'OS_AUTH_URL': 'auth_url',
50 'OS_TENANT_NAME': 'tenant_name',
51 'OS_USER_DOMAIN_NAME': 'user_domain_name',
52 'OS_PROJECT_DOMAIN_NAME': 'project_domain_name',
53 'OS_PROJECT_NAME': 'project_name',
54 'OS_ENDPOINT_TYPE': 'endpoint_type',
55 'OS_REGION_NAME': 'region_name'
59 def test_get_id_from_name(self):
60 with mock.patch.object(self.tacker_client, 'get',
61 return_value=self.getresponse):
62 resp = openstack_tacker.get_id_from_name(self.tacker_client,
65 self.assertEqual(resp, 'test_id')
67 @mock.patch('functest.utils.openstack_tacker.logger.error')
68 def test_get_id_from_name_exception(self, mock_logger_error):
69 with mock.patch.object(self.tacker_client, 'get',
70 side_effect=Exception):
71 resp = openstack_tacker.get_id_from_name(self.tacker_client,
74 self.assertIsNone(resp)
75 mock_logger_error.assert_called_once_with(test_utils.
76 SubstrMatch("Error [get"
85 @mock.patch('functest.utils.openstack_tacker.get_id_from_name')
86 def test_get_vnfd_id(self, mock_get_id_from_name):
87 openstack_tacker.get_vnfd_id(self.tacker_client, self.resource_name)
88 mock_get_id_from_name.assert_called_once_with(self.tacker_client,
92 @mock.patch('functest.utils.openstack_tacker.get_id_from_name')
93 def test_get_vnf_id(self, mock_get_id_from_name):
94 openstack_tacker.get_vnf_id(self.tacker_client, self.resource_name)
95 mock_get_id_from_name.assert_called_once_with(self.tacker_client,
99 @mock.patch('functest.utils.openstack_tacker.get_id_from_name')
100 def test_get_sfc_id(self, mock_get_id_from_name):
101 openstack_tacker.get_sfc_id(self.tacker_client, self.resource_name)
102 mock_get_id_from_name.assert_called_once_with(self.tacker_client,
106 @mock.patch('functest.utils.openstack_tacker.get_id_from_name')
107 def test_get_sfc_classifier_id(self, mock_get_id_from_name):
108 openstack_tacker.get_sfc_classifier_id(self.tacker_client,
110 mock_get_id_from_name.assert_called_once_with(self.tacker_client,
114 def test_list_vnfds(self):
115 with mock.patch.object(self.tacker_client, 'list_vnfds',
116 return_value=self.vnfdlist):
117 resp = openstack_tacker.list_vnfds(self.tacker_client,
119 self.assertEqual(resp, ['test_vnfd1', 'test_vnfd2'])
121 def test_list_vnfds_verbose(self):
122 with mock.patch.object(self.tacker_client, 'list_vnfds',
123 return_value=self.vnfdlist):
124 resp = openstack_tacker.list_vnfds(self.tacker_client,
126 self.assertEqual(resp, self.vnfdlist)
128 @mock.patch('functest.utils.openstack_tacker.logger.error')
129 def test_list_vnfds_exception(self, mock_logger_error):
130 with mock.patch.object(self.tacker_client, 'list_vnfds',
131 side_effect=Exception):
132 resp = openstack_tacker.list_vnfds(self.tacker_client,
134 mock_logger_error.assert_called_once_with(test_utils.
140 self.assertIsNone(resp)
142 def test_create_vnfd_missing_file(self):
143 with mock.patch.object(self.tacker_client, 'create_vnfd',
144 return_value=self.createvnfd):
145 resp = openstack_tacker.create_vnfd(self.tacker_client,
147 self.assertEqual(resp, self.createvnfd)
149 def test_create_vnfd_default(self):
150 with mock.patch.object(self.tacker_client, 'create_vnfd',
151 return_value=self.createvnfd), \
152 mock.patch('__builtin__.open', mock.mock_open(read_data='1')) \
154 resp = openstack_tacker.create_vnfd(self.tacker_client,
155 tosca_file=self.tosca_file)
156 m.assert_called_once_with(self.tosca_file)
157 self.assertEqual(resp, self.createvnfd)
159 @mock.patch('functest.utils.openstack_tacker.logger.error')
160 def test_create_vnfd_exception(self, mock_logger_error):
161 with mock.patch.object(self.tacker_client, 'create_vnfd',
162 side_effect=Exception):
163 resp = openstack_tacker.create_vnfd(self.tacker_client,
164 tosca_file=self.tosca_file)
165 mock_logger_error.assert_called_once_with(test_utils.
174 self.assertIsNone(resp)
176 def test_delete_vnfd(self):
177 with mock.patch('functest.utils.openstack_tacker.get_vnfd_id',
178 return_value=self.vnfd), \
179 mock.patch.object(self.tacker_client, 'delete_vnfd',
180 return_value=self.vnfd):
181 resp = openstack_tacker.delete_vnfd(self.tacker_client,
184 self.assertEqual(resp, self.vnfd)
186 # TODO: Exception('You need to provide an VNFD'
187 # 'id or name') AssertionError
189 @mock.patch('functest.utils.openstack_tacker.logger.error')
190 def test_delete_vnfd_exception(self, mock_logger_error):
191 with mock.patch('functest.utils.openstack_tacker.get_vnfd_id',
192 return_value=self.vnfd), \
193 mock.patch.object(self.tacker_client, 'delete_vnfd',
194 side_effect=Exception):
195 resp = openstack_tacker.delete_vnfd(self.tacker_client,
198 self.assertIsNone(resp)
199 self.assertTrue(mock_logger_error.called)
201 def test_list_vnfs(self):
202 with mock.patch.object(self.tacker_client, 'list_vnfs',
203 return_value=self.vnflist):
204 resp = openstack_tacker.list_vnfs(self.tacker_client,
206 self.assertEqual(resp, ['test_vnf1', 'test_vnf2'])
208 def test_list_vnfs_verbose(self):
209 with mock.patch.object(self.tacker_client, 'list_vnfs',
210 return_value=self.vnflist):
211 resp = openstack_tacker.list_vnfs(self.tacker_client,
213 self.assertEqual(resp, self.vnflist)
215 @mock.patch('functest.utils.openstack_tacker.logger.error')
216 def test_list_vnfs_exception(self, mock_logger_error):
217 with mock.patch.object(self.tacker_client, 'list_vnfs',
218 side_effect=Exception):
219 resp = openstack_tacker.list_vnfs(self.tacker_client,
221 mock_logger_error.assert_called_once_with(test_utils.
227 self.assertIsNone(resp)
229 def test_create_vnf_default(self):
230 with mock.patch.object(self.tacker_client, 'create_vnf',
231 return_value=self.createvnf), \
232 mock.patch('functest.utils.openstack_tacker.get_vnfd_id',
233 return_value=self.vnf):
234 resp = openstack_tacker.create_vnf(self.tacker_client,
238 self.assertEqual(resp, self.createvnf)
240 @mock.patch('functest.utils.openstack_tacker.logger.error')
241 def test_create_vnf_exception(self, mock_logger_error):
242 with mock.patch.object(self.tacker_client, 'create_vnf',
243 side_effect=Exception):
244 resp = openstack_tacker.create_vnf(self.tacker_client,
248 mock_logger_error.assert_called_once_with(test_utils.
254 self.assertIsNone(resp)
258 def test_delete_vnf(self):
259 with mock.patch('functest.utils.openstack_tacker.get_vnf_id',
260 return_value=self.vnf), \
261 mock.patch.object(self.tacker_client, 'delete_vnf',
262 return_value=self.vnf):
263 resp = openstack_tacker.delete_vnf(self.tacker_client,
266 self.assertEqual(resp, self.vnf)
268 # TODO: Exception('You need to provide an VNF'
269 # 'classifier id or name') AssertionError
271 @mock.patch('functest.utils.openstack_tacker.logger.error')
272 def test_delete_vnf_exception(self, mock_logger_error):
273 with mock.patch('functest.utils.openstack_tacker.get_vnf_id',
274 return_value=self.vnf), \
275 mock.patch.object(self.tacker_client, 'delete_vnf',
276 side_effect=Exception):
277 resp = openstack_tacker.delete_vnf(self.tacker_client,
280 self.assertIsNone(resp)
281 self.assertTrue(mock_logger_error.called)
283 def test_list_sfcs(self):
284 with mock.patch.object(self.tacker_client, 'list_sfcs',
285 return_value=self.sfclist):
286 resp = openstack_tacker.list_sfcs(self.tacker_client,
288 self.assertEqual(resp, ['test_sfc1', 'test_sfc2'])
290 def test_list_sfcs_verbose(self):
291 with mock.patch.object(self.tacker_client, 'list_sfcs',
292 return_value=self.sfclist):
293 resp = openstack_tacker.list_sfcs(self.tacker_client,
295 self.assertEqual(resp, self.sfclist)
297 @mock.patch('functest.utils.openstack_tacker.logger.error')
298 def test_list_sfcs_exception(self, mock_logger_error):
299 with mock.patch.object(self.tacker_client, 'list_sfcs',
300 side_effect=Exception):
301 resp = openstack_tacker.list_sfcs(self.tacker_client,
303 mock_logger_error.assert_called_once_with(test_utils.
309 self.assertIsNone(resp)
311 def test_create_sfc_default(self):
312 with mock.patch.object(self.tacker_client, 'create_sfc',
313 return_value=self.createsfc), \
314 mock.patch('functest.utils.openstack_tacker.get_vnf_id',
315 return_value=self.vnf):
316 resp = openstack_tacker.create_sfc(self.tacker_client,
318 chain_vnf_ids=['chain_vnf_id'],
319 chain_vnf_names=[self.vnf])
320 self.assertEqual(resp, self.createsfc)
322 @mock.patch('functest.utils.openstack_tacker.logger.error')
323 def test_create_sfc_exception(self, mock_logger_error):
324 with mock.patch.object(self.tacker_client, 'create_sfc',
325 side_effect=Exception):
326 resp = openstack_tacker.create_sfc(self.tacker_client,
328 chain_vnf_ids=['chain_vnf_id'],
329 chain_vnf_names=[self.vnf])
330 mock_logger_error.assert_called_once_with(test_utils.
336 self.assertIsNone(resp)
338 def test_delete_sfc(self):
339 with mock.patch('functest.utils.openstack_tacker.get_sfc_id',
340 return_value=self.sfc), \
341 mock.patch.object(self.tacker_client, 'delete_sfc',
342 return_value=self.sfc):
343 resp = openstack_tacker.delete_sfc(self.tacker_client,
346 self.assertEqual(resp, self.sfc)
348 # TODO: Exception('You need to provide an SFC'
349 # 'id or name') AssertionError
351 @mock.patch('functest.utils.openstack_tacker.logger.error')
352 def test_delete_sfc_exception(self, mock_logger_error):
353 with mock.patch('functest.utils.openstack_tacker.get_sfc_id',
354 return_value=self.sfc), \
355 mock.patch.object(self.tacker_client, 'delete_sfc',
356 side_effect=Exception):
357 resp = openstack_tacker.delete_sfc(self.tacker_client,
360 self.assertIsNone(resp)
361 self.assertTrue(mock_logger_error.called)
363 def test_list_sfc_classifiers(self):
364 with mock.patch.object(self.tacker_client, 'list_sfc_classifiers',
365 return_value=self.sfc_classifierlist):
366 resp = openstack_tacker.list_sfc_classifiers(self.tacker_client,
368 self.assertEqual(resp, ['test_sfc_cl1', 'test_sfc_cl2'])
370 def test_list_sfc_classifiers_verbose(self):
371 with mock.patch.object(self.tacker_client, 'list_sfc_classifiers',
372 return_value=self.sfc_classifierlist):
373 resp = openstack_tacker.list_sfc_classifiers(self.tacker_client,
375 self.assertEqual(resp, self.sfc_classifierlist)
377 @mock.patch('functest.utils.openstack_tacker.logger.error')
378 def test_list_sfc_classifiers_exception(self, mock_logger_error):
379 with mock.patch.object(self.tacker_client, 'list_sfc_classifiers',
380 side_effect=Exception):
381 resp = openstack_tacker.list_sfc_classifiers(self.tacker_client,
383 mock_logger_error.assert_called_once_with(test_utils.
390 self.assertIsNone(resp)
392 def test_create_sfc_classifier_default(self):
393 with mock.patch.object(self.tacker_client, 'create_sfc_classifier',
394 return_value=self.createsfc_clf), \
395 mock.patch('functest.utils.openstack_tacker.get_sfc_id',
396 return_value=self.sfc):
398 resp = openstack_tacker.create_sfc_classifier(self.tacker_client,
402 self.assertEqual(resp, self.createsfc_clf)
404 @mock.patch('functest.utils.openstack_tacker.logger.error')
405 def test_sfc_classifier_exception(self, mock_logger_error):
406 with mock.patch.object(self.tacker_client, 'create_sfc_classifier',
407 side_effect=Exception):
409 resp = openstack_tacker.create_sfc_classifier(self.tacker_client,
413 mock_logger_error.assert_called_once_with(test_utils.
420 self.assertIsNone(resp)
422 def test_delete_sfc_classifier(self):
423 with mock.patch('functest.utils.openstack_tacker.get_sfc_'
425 return_value=self.sfc_clf), \
426 mock.patch.object(self.tacker_client, 'delete_sfc_classifier',
427 return_value=self.sfc_clf):
429 resp = openstack_tacker.delete_sfc_classifier(self.tacker_client,
432 self.assertEqual(resp, cl)
434 # TODO: Exception('You need to provide an SFC'
435 # 'classifier id or name') AssertionError
437 @mock.patch('functest.utils.openstack_tacker.logger.error')
438 def test_delete_sfc_classifier_exception(self, mock_logger_error):
439 with mock.patch('functest.utils.openstack_tacker.get_sfc_'
441 return_value=self.sfc_clf), \
442 mock.patch.object(self.tacker_client, 'delete_sfc_classifier',
443 side_effect=Exception):
445 resp = openstack_tacker.delete_sfc_classifier(self.tacker_client,
448 self.assertIsNone(resp)
449 self.assertTrue(mock_logger_error.called)
452 if __name__ == "__main__":
453 unittest.main(verbosity=2)