3 # Copyright (c) 2016 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
17 from keystoneauth1.exceptions import auth_plugins
18 from robot.errors import RobotError
19 from robot.result import testcase
21 mock.patch('logging.FileHandler').start() # noqa
22 from functest.core import testcase_base
23 from functest.opnfv_tests.sdn.odl import odl
26 class ODLTesting(unittest.TestCase):
28 logging.disable(logging.CRITICAL)
30 _keystone_ip = "127.0.0.1"
31 _neutron_ip = "127.0.0.2"
32 _sdn_controller_ip = "127.0.0.3"
33 _os_tenantname = "admin"
34 _os_username = "admin"
35 _os_password = "admin"
37 _odl_restconfport = "8181"
38 _odl_username = "admin"
39 _odl_password = "admin"
42 for var in ("INSTALLER_TYPE", "SDN_CONTROLLER", "SDN_CONTROLLER_IP"):
45 os.environ["OS_USERNAME"] = self._os_username
46 os.environ["OS_PASSWORD"] = self._os_password
47 os.environ["OS_TENANT_NAME"] = self._os_tenantname
48 self.test = odl.ODLTests()
50 def test_empty_visitor(self):
51 visitor = odl.ODLResultVisitor()
52 self.assertFalse(visitor.get_data())
54 def test_visitor(self):
55 visitor = odl.ODLResultVisitor()
56 data = {'name': 'foo',
59 'starttime': "20161216 16:00:00.000",
60 'endtime': "20161216 16:00:01.000",
62 'text': 'Hello, World!',
64 test = testcase.TestCase(name=data['name'],
65 status=data['status'],
67 starttime=data['starttime'],
68 endtime=data['endtime'])
69 test.parent = mock.Mock()
70 config = {'name': data['parent'],
71 'criticality.test_is_critical.return_value': data[
73 test.parent.configure_mock(**config)
74 visitor.visit_test(test)
75 self.assertEqual(visitor.get_data(), [data])
77 @mock.patch('fileinput.input', side_effect=Exception())
78 def test_set_robotframework_vars_failed(self, *args):
79 self.assertFalse(self.test.set_robotframework_vars())
81 @mock.patch('fileinput.input', return_value=[])
82 def test_set_robotframework_vars_empty(self, args):
83 self.assertTrue(self.test.set_robotframework_vars())
85 @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
86 def _test_set_robotframework_vars(self, msg1, msg2, *args):
87 line = mock.MagicMock()
88 line.__iter__.return_value = [msg1]
89 with mock.patch('fileinput.input', return_value=line) as mock_method:
90 self.assertTrue(self.test.set_robotframework_vars())
91 mock_method.assert_called_once_with(
92 os.path.join(odl.ODLTests.odl_test_repo,
93 'csit/variables/Variables.py'), inplace=True)
94 self.assertEqual(args[0].getvalue(), "{}\n".format(msg2))
96 def test_set_robotframework_vars_auth_default(self):
97 self._test_set_robotframework_vars("AUTH = []",
98 "AUTH = [u'admin', u'admin']")
100 def test_set_robotframework_vars_auth1(self):
101 self._test_set_robotframework_vars("AUTH1 = []", "AUTH1 = []")
103 @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
104 def test_set_robotframework_vars_auth_foo(self, *args):
105 line = mock.MagicMock()
106 line.__iter__.return_value = ["AUTH = []"]
107 with mock.patch('fileinput.input', return_value=line) as mock_method:
108 self.assertTrue(self.test.set_robotframework_vars('foo', 'bar'))
109 mock_method.assert_called_once_with(
110 os.path.join(odl.ODLTests.odl_test_repo,
111 'csit/variables/Variables.py'), inplace=True)
112 self.assertEqual(args[0].getvalue(),
113 "AUTH = [u'{}', u'{}']\n".format('foo', 'bar'))
116 def _fake_url_for(cls, service_type='identity', **kwargs):
117 if service_type == 'identity':
118 return "http://{}:5000/v2.0".format(
119 ODLTesting._keystone_ip)
120 elif service_type == 'network':
121 return "http://{}:9696".format(ODLTesting._neutron_ip)
125 def _get_main_kwargs(self, key=None):
126 kwargs = {'odlusername': self._odl_username,
127 'odlpassword': self._odl_password,
128 'keystoneip': self._keystone_ip,
129 'neutronip': self._neutron_ip,
130 'osusername': self._os_username,
131 'ostenantname': self._os_tenantname,
132 'ospassword': self._os_password,
133 'odlip': self._sdn_controller_ip,
134 'odlwebport': self._odl_webport,
135 'odlrestconfport': self._odl_restconfport}
140 def _test_main(self, status, *args):
141 kwargs = self._get_main_kwargs()
142 self.assertEqual(self.test.main(**kwargs), status)
144 args[0].assert_called_once_with(
145 odl.ODLTests.res_dir)
147 variable = ['KEYSTONE:{}'.format(self._keystone_ip),
148 'NEUTRON:{}'.format(self._neutron_ip),
149 'OSUSERNAME:"{}"'.format(self._os_username),
150 'OSTENANTNAME:"{}"'.format(self._os_tenantname),
151 'OSPASSWORD:"{}"'.format(self._os_password),
152 'ODL_SYSTEM_IP:{}'.format(self._sdn_controller_ip),
153 'PORT:{}'.format(self._odl_webport),
154 'RESTCONFPORT:{}'.format(self._odl_restconfport)]
155 args[1].assert_called_once_with(
156 odl.ODLTests.basic_suite_dir,
157 odl.ODLTests.neutron_suite_dir,
159 output=os.path.join(odl.ODLTests.res_dir, 'output.xml'),
164 args[2].assert_called_with(
165 os.path.join(odl.ODLTests.res_dir, 'stdout.txt'))
167 def _test_main_missing_keyword(self, key):
168 kwargs = self._get_main_kwargs(key)
169 self.assertEqual(self.test.main(**kwargs),
170 testcase_base.TestcaseBase.EX_RUN_ERROR)
172 def test_main_missing_odlusername(self):
173 self._test_main_missing_keyword('odlusername')
175 def test_main_missing_odlpassword(self):
176 self._test_main_missing_keyword('odlpassword')
178 def test_main_missing_keystoneip(self):
179 self._test_main_missing_keyword('keystoneip')
181 def test_main_missing_neutronip(self):
182 self._test_main_missing_keyword('neutronip')
184 def test_main_missing_osusername(self):
185 self._test_main_missing_keyword('osusername')
187 def test_main_missing_ostenantname(self):
188 self._test_main_missing_keyword('ostenantname')
190 def test_main_missing_ospassword(self):
191 self._test_main_missing_keyword('ospassword')
193 def test_main_missing_odlip(self):
194 self._test_main_missing_keyword('odlip')
196 def test_main_missing_odlwebport(self):
197 self._test_main_missing_keyword('odlwebport')
199 def test_main_missing_odlrestconfport(self):
200 self._test_main_missing_keyword('odlrestconfport')
202 def test_main_set_robotframework_vars_failed(self):
203 with mock.patch.object(self.test, 'set_robotframework_vars',
205 self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR)
206 self.test.set_robotframework_vars.assert_called_once_with(
207 self._odl_username, self._odl_password)
209 @mock.patch('os.makedirs', side_effect=Exception)
210 def test_main_makedirs_exception(self, mock_method):
211 with mock.patch.object(self.test, 'set_robotframework_vars',
212 return_value=True), \
213 self.assertRaises(Exception):
214 self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR,
217 @mock.patch('os.makedirs', side_effect=OSError)
218 def test_main_makedirs_oserror(self, mock_method):
219 with mock.patch.object(self.test, 'set_robotframework_vars',
221 self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR,
224 @mock.patch('robot.run', side_effect=RobotError)
225 @mock.patch('os.makedirs')
226 def test_main_robot_run_failed(self, *args):
227 with mock.patch.object(self.test, 'set_robotframework_vars',
228 return_value=True), \
229 mock.patch.object(odl, 'open', mock.mock_open(),
231 self.assertRaises(RobotError):
232 self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR, *args)
234 @mock.patch('robot.run')
235 @mock.patch('os.makedirs')
236 def test_main_parse_results_failed(self, *args):
237 with mock.patch.object(self.test, 'set_robotframework_vars',
238 return_value=True), \
239 mock.patch.object(odl, 'open', mock.mock_open(),
241 mock.patch.object(self.test, 'parse_results',
242 side_effect=RobotError):
243 self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR, *args)
245 @mock.patch('os.remove', side_effect=Exception)
246 @mock.patch('robot.run')
247 @mock.patch('os.makedirs')
248 def test_main_remove_exception(self, *args):
249 with mock.patch.object(self.test, 'set_robotframework_vars',
250 return_value=True), \
251 mock.patch.object(self.test, 'parse_results'), \
252 self.assertRaises(Exception):
253 self._test_main(testcase_base.TestcaseBase.EX_OK, *args)
255 @mock.patch('os.remove')
256 @mock.patch('robot.run')
257 @mock.patch('os.makedirs')
258 def test_main(self, *args):
259 with mock.patch.object(self.test, 'set_robotframework_vars',
260 return_value=True), \
261 mock.patch.object(odl, 'open', mock.mock_open(),
263 mock.patch.object(self.test, 'parse_results'):
264 self._test_main(testcase_base.TestcaseBase.EX_OK, *args)
266 @mock.patch('os.remove')
267 @mock.patch('robot.run')
268 @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, ''))
269 def test_main_makedirs_oserror17(self, *args):
270 with mock.patch.object(self.test, 'set_robotframework_vars',
271 return_value=True), \
272 mock.patch.object(odl, 'open', mock.mock_open(),
274 mock.patch.object(self.test, 'parse_results'):
275 self._test_main(testcase_base.TestcaseBase.EX_OK, *args)
277 @mock.patch('os.remove')
278 @mock.patch('robot.run', return_value=1)
279 @mock.patch('os.makedirs')
280 def test_main_testcases_in_failure(self, *args):
281 with mock.patch.object(self.test, 'set_robotframework_vars',
282 return_value=True), \
283 mock.patch.object(odl, 'open', mock.mock_open(),
285 mock.patch.object(self.test, 'parse_results'):
286 self._test_main(testcase_base.TestcaseBase.EX_OK, *args)
288 @mock.patch('os.remove', side_effect=OSError)
289 @mock.patch('robot.run')
290 @mock.patch('os.makedirs')
291 def test_main_remove_oserror(self, *args):
292 with mock.patch.object(self.test, 'set_robotframework_vars',
293 return_value=True), \
294 mock.patch.object(odl, 'open', mock.mock_open(),
296 mock.patch.object(self.test, 'parse_results'):
297 self._test_main(testcase_base.TestcaseBase.EX_OK, *args)
299 def _test_run_missing_env_var(self, var):
300 with mock.patch('functest.utils.openstack_utils.get_endpoint',
301 side_effect=self._fake_url_for):
303 self.assertEqual(self.test.run(),
304 testcase_base.TestcaseBase.EX_RUN_ERROR)
306 def _test_run(self, status=testcase_base.TestcaseBase.EX_OK,
307 exception=None, odlip="127.0.0.3", odlwebport="8080"):
308 with mock.patch('functest.utils.openstack_utils.get_endpoint',
309 side_effect=self._fake_url_for):
311 self.test.main = mock.Mock(side_effect=exception)
313 self.test.main = mock.Mock(return_value=status)
314 self.assertEqual(self.test.run(), status)
315 self.test.main.assert_called_once_with(
316 keystoneip=self._keystone_ip, neutronip=self._neutron_ip,
317 odlip=odlip, odlpassword=self._odl_password,
318 odlrestconfport=self._odl_restconfport,
319 odlusername=self._odl_username, odlwebport=odlwebport,
320 ospassword=self._os_password, ostenantname=self._os_tenantname,
321 osusername=self._os_username)
323 def test_run_exception(self):
324 with mock.patch('functest.utils.openstack_utils.get_endpoint',
325 side_effect=auth_plugins.MissingAuthPlugin()):
326 self.assertEqual(self.test.run(),
327 testcase_base.TestcaseBase.EX_RUN_ERROR)
329 def test_run_missing_os_username(self):
330 self._test_run_missing_env_var("OS_USERNAME")
332 def test_run_missing_os_password(self):
333 self._test_run_missing_env_var("OS_PASSWORD")
335 def test_run_missing_os_tenant_name(self):
336 self._test_run_missing_env_var("OS_TENANT_NAME")
338 def test_run_main_false(self):
339 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
340 self._test_run(testcase_base.TestcaseBase.EX_RUN_ERROR,
341 odlip=self._sdn_controller_ip,
342 odlwebport=self._odl_webport)
344 def test_run_main_exception(self):
345 with self.assertRaises(Exception):
346 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
347 self._test_run(status=testcase_base.TestcaseBase.EX_RUN_ERROR,
348 exception=Exception(),
349 odlip=self._sdn_controller_ip,
350 odlwebport=self._odl_webport)
352 def test_run_missing_sdn_controller_ip(self):
353 with mock.patch('functest.utils.openstack_utils.get_endpoint',
354 side_effect=self._fake_url_for):
355 self.assertEqual(self.test.run(),
356 testcase_base.TestcaseBase.EX_RUN_ERROR)
358 def test_run_without_installer_type(self):
359 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
360 self._test_run(testcase_base.TestcaseBase.EX_OK,
361 odlip=self._sdn_controller_ip,
362 odlwebport=self._odl_webport)
364 def test_run_fuel(self):
365 os.environ["INSTALLER_TYPE"] = "fuel"
366 self._test_run(testcase_base.TestcaseBase.EX_OK,
367 odlip=self._neutron_ip, odlwebport='8282')
369 def test_run_apex_missing_sdn_controller_ip(self):
370 with mock.patch('functest.utils.openstack_utils.get_endpoint',
371 side_effect=self._fake_url_for):
372 os.environ["INSTALLER_TYPE"] = "apex"
373 self.assertEqual(self.test.run(),
374 testcase_base.TestcaseBase.EX_RUN_ERROR)
376 def test_run_apex(self):
377 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
378 os.environ["INSTALLER_TYPE"] = "apex"
379 self._test_run(testcase_base.TestcaseBase.EX_OK,
380 odlip=self._sdn_controller_ip, odlwebport='8181')
382 def test_run_joid_missing_sdn_controller(self):
383 with mock.patch('functest.utils.openstack_utils.get_endpoint',
384 side_effect=self._fake_url_for):
385 os.environ["INSTALLER_TYPE"] = "joid"
386 self.assertEqual(self.test.run(),
387 testcase_base.TestcaseBase.EX_RUN_ERROR)
389 def test_run_joid(self):
390 os.environ["SDN_CONTROLLER"] = self._sdn_controller_ip
391 os.environ["INSTALLER_TYPE"] = "joid"
392 self._test_run(testcase_base.TestcaseBase.EX_OK,
393 odlip=self._sdn_controller_ip, odlwebport='8080')
395 def test_run_compass(self, *args):
396 os.environ["INSTALLER_TYPE"] = "compass"
397 self._test_run(testcase_base.TestcaseBase.EX_OK,
398 odlip=self._neutron_ip, odlwebport='8181')
401 if __name__ == "__main__":
402 unittest.main(verbosity=2)