5 from copy import deepcopy
6 from datetime import datetime
8 from opnfv_testapi.common import message
9 import opnfv_testapi.resources.scenario_models as models
10 from opnfv_testapi.tests.unit.resources import test_base as base
13 def _none_default(check, default):
14 return check if check else default
17 class TestScenarioBase(base.TestBase):
19 super(TestScenarioBase, self).setUp()
20 self.get_res = models.Scenario
21 self.list_res = models.Scenarios
22 self.basePath = '/api/v1/scenarios'
23 self.req_d = self._load_request('scenario-c1.json')
24 self.req_2 = self._load_request('scenario-c2.json')
29 def assert_body(self, project, req=None):
33 def _load_request(f_req):
34 abs_file = os.path.join(os.path.dirname(__file__), f_req)
35 with open(abs_file, 'r') as f:
40 def create_return_name(self, req):
41 _, res = self.create(req)
42 return res.href.split('/')[-1]
44 def assert_res(self, code, scenario, req=None):
45 self.assertEqual(code, httplib.OK)
48 self.assertIsNotNone(scenario._id)
49 self.assertIsNotNone(scenario.creation_date)
50 self.assertEqual(scenario, models.Scenario.from_dict(req))
53 def _set_query(*args):
59 def _get_and_assert(self, name, req=None):
60 code, body = self.get(name)
61 self.assert_res(code, body, req)
64 class TestScenarioCreate(TestScenarioBase):
65 def test_withoutBody(self):
66 (code, body) = self.create()
67 self.assertEqual(code, httplib.BAD_REQUEST)
69 def test_emptyName(self):
70 req_empty = models.ScenarioCreateRequest('')
71 (code, body) = self.create(req_empty)
72 self.assertEqual(code, httplib.BAD_REQUEST)
73 self.assertIn(message.missing('name'), body)
75 def test_noneName(self):
76 req_none = models.ScenarioCreateRequest(None)
77 (code, body) = self.create(req_none)
78 self.assertEqual(code, httplib.BAD_REQUEST)
79 self.assertIn(message.missing('name'), body)
81 def test_success(self):
82 (code, body) = self.create_d()
83 self.assertEqual(code, httplib.OK)
84 self.assert_create_body(body)
86 def test_alreadyExist(self):
88 (code, body) = self.create_d()
89 self.assertEqual(code, httplib.FORBIDDEN)
90 self.assertIn(message.exist_base, body)
93 class TestScenarioGet(TestScenarioBase):
95 super(TestScenarioGet, self).setUp()
96 self.scenario_1 = self.create_return_name(self.req_d)
97 self.scenario_2 = self.create_return_name(self.req_2)
99 def test_getByName(self):
100 self._get_and_assert(self.scenario_1, self.req_d)
102 def test_getAll(self):
103 self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
105 def test_queryName(self):
106 query = self._set_query('name=nosdn-nofeature-ha')
107 self._query_and_assert(query, reqs=[self.req_d])
109 def test_queryInstaller(self):
110 query = self._set_query('installer=apex')
111 self._query_and_assert(query, reqs=[self.req_d])
113 def test_queryVersion(self):
114 query = self._set_query('version=master')
115 self._query_and_assert(query, reqs=[self.req_d])
117 def test_queryProject(self):
118 query = self._set_query('project=functest')
119 self._query_and_assert(query, reqs=[self.req_d, self.req_2])
121 # close due to random fail, open again after solve it in another patch
122 # def test_queryCombination(self):
123 # query = self._set_query('name=nosdn-nofeature-ha',
126 # 'project=functest')
128 # self._query_and_assert(query, reqs=[self.req_d])
130 def _query_and_assert(self, query, found=True, reqs=None):
131 code, body = self.query(query)
133 self.assertEqual(code, httplib.OK)
134 self.assertEqual(0, len(body.scenarios))
136 self.assertEqual(len(reqs), len(body.scenarios))
138 for scenario in body.scenarios:
139 if req['name'] == scenario.name:
140 self.assert_res(code, scenario, req)
143 class TestScenarioDelete(TestScenarioBase):
144 def test_notFound(self):
145 code, body = self.delete('notFound')
146 self.assertEqual(code, httplib.NOT_FOUND)
148 def test_success(self):
149 scenario = self.create_return_name(self.req_d)
150 code, _ = self.delete(scenario)
151 self.assertEqual(code, httplib.OK)
152 code, _ = self.get(scenario)
153 self.assertEqual(code, httplib.NOT_FOUND)
156 class TestScenarioUpdate(TestScenarioBase):
158 super(TestScenarioUpdate, self).setUp()
159 self.scenario = self.create_return_name(self.req_d)
160 self.scenario_2 = self.create_return_name(self.req_2)
162 self.scenario_url = '/api/v1/scenarios/{}'.format(self.scenario)
163 self.installer = self.req_d['installers'][0]['installer']
164 self.version = self.req_d['installers'][0]['versions'][0]['version']
165 self.locate_project = 'installer={}&version={}&project={}'.format(
170 def update_url_fixture(item):
171 def _update_url_fixture(xstep):
172 def wrapper(self, *args, **kwargs):
174 if item in ['projects', 'owner']:
175 locator = 'installer={}&version={}'.format(
178 elif item in ['versions']:
179 locator = 'installer={}'.format(
182 self.update_url = '{}/{}?{}'.format(self.scenario_url,
185 xstep(self, *args, **kwargs)
187 return _update_url_fixture
189 def update_partial(operate, expected):
190 def _update_partial(set_update):
191 @functools.wraps(set_update)
193 update, scenario = set_update(self, deepcopy(self.req_d))
194 code, body = getattr(self, operate)(update, self.scenario)
195 getattr(self, expected)(code, scenario)
197 return _update_partial
199 @update_partial('_add', '_success')
200 def test_addScore(self, scenario):
201 add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
202 projects = scenario['installers'][0]['versions'][0]['projects']
203 functest = filter(lambda f: f['project'] == 'functest', projects)[0]
204 functest['scores'].append(add.format())
205 self.update_url = '{}/scores?{}'.format(self.scenario_url,
210 @update_partial('_add', '_success')
211 def test_addTrustIndicator(self, scenario):
212 add = models.ScenarioTI(date=str(datetime.now()), status='gold')
213 projects = scenario['installers'][0]['versions'][0]['projects']
214 functest = filter(lambda f: f['project'] == 'functest', projects)[0]
215 functest['trust_indicators'].append(add.format())
216 self.update_url = '{}/trust_indicators?{}'.format(self.scenario_url,
221 @update_partial('_add', '_success')
222 def test_addCustoms(self, scenario):
223 add = ['odl', 'parser', 'vping_ssh']
224 projects = scenario['installers'][0]['versions'][0]['projects']
225 functest = filter(lambda f: f['project'] == 'functest', projects)[0]
226 functest['customs'] = list(set(functest['customs'] + add))
227 self.update_url = '{}/customs?{}'.format(self.scenario_url,
231 @update_partial('_update', '_success')
232 def test_updateCustoms(self, scenario):
233 news = ['odl', 'parser', 'vping_ssh']
234 projects = scenario['installers'][0]['versions'][0]['projects']
235 functest = filter(lambda f: f['project'] == 'functest', projects)[0]
236 functest['customs'] = news
237 self.update_url = '{}/customs?{}'.format(self.scenario_url,
240 return news, scenario
242 @update_partial('_delete', '_success')
243 def test_deleteCustoms(self, scenario):
244 obsoletes = ['vping_ssh']
245 projects = scenario['installers'][0]['versions'][0]['projects']
246 functest = filter(lambda f: f['project'] == 'functest', projects)[0]
247 functest['customs'] = ['healthcheck']
248 self.update_url = '{}/customs?{}'.format(self.scenario_url,
251 return obsoletes, scenario
253 @update_url_fixture('projects')
254 @update_partial('_add', '_success')
255 def test_addProjects_succ(self, scenario):
256 add = models.ScenarioProject(project='qtip').format()
257 scenario['installers'][0]['versions'][0]['projects'].append(add)
258 return [add], scenario
260 @update_url_fixture('projects')
261 @update_partial('_add', '_conflict')
262 def test_addProjects_already_exist(self, scenario):
263 add = models.ScenarioProject(project='functest').format()
264 scenario['installers'][0]['versions'][0]['projects'].append(add)
265 return [add], scenario
267 @update_url_fixture('projects')
268 @update_partial('_add', '_bad_request')
269 def test_addProjects_bad_schema(self, scenario):
270 add = models.ScenarioProject(project='functest').format()
272 scenario['installers'][0]['versions'][0]['projects'].append(add)
273 return [add], scenario
275 @update_url_fixture('projects')
276 @update_partial('_update', '_success')
277 def test_updateProjects_succ(self, scenario):
278 update = models.ScenarioProject(project='qtip').format()
279 scenario['installers'][0]['versions'][0]['projects'] = [update]
280 return [update], scenario
282 @update_url_fixture('projects')
283 @update_partial('_update', '_conflict')
284 def test_updateProjects_duplicated(self, scenario):
285 update1 = models.ScenarioProject(project='qtip').format()
286 update2 = models.ScenarioProject(project='qtip').format()
287 scenario['installers'][0]['versions'][0]['projects'] = [update1,
289 return [update1, update2], scenario
291 @update_url_fixture('projects')
292 @update_partial('_update', '_bad_request')
293 def test_updateProjects_bad_schema(self, scenario):
294 update = models.ScenarioProject(project='functest').format()
295 update['score'] = None
296 scenario['installers'][0]['versions'][0]['projects'] = [update]
297 return [update], scenario
299 @update_url_fixture('projects')
300 @update_partial('_delete', '_success')
301 def test_deleteProjects(self, scenario):
302 deletes = ['functest']
303 projects = scenario['installers'][0]['versions'][0]['projects']
304 scenario['installers'][0]['versions'][0]['projects'] = filter(
305 lambda f: f['project'] != 'functest',
307 return deletes, scenario
309 @update_url_fixture('owner')
310 @update_partial('_update', '_success')
311 def test_changeOwner(self, scenario):
312 new_owner = 'new_owner'
313 update = models.ScenarioChangeOwnerRequest(new_owner).format()
314 scenario['installers'][0]['versions'][0]['owner'] = new_owner
315 return update, scenario
317 @update_url_fixture('versions')
318 @update_partial('_add', '_success')
319 def test_addVersions_succ(self, scenario):
320 add = models.ScenarioVersion(version='Euphrates').format()
321 scenario['installers'][0]['versions'].append(add)
322 return [add], scenario
324 @update_url_fixture('versions')
325 @update_partial('_add', '_conflict')
326 def test_addVersions_already_exist(self, scenario):
327 add = models.ScenarioVersion(version='master').format()
328 scenario['installers'][0]['versions'].append(add)
329 return [add], scenario
331 @update_url_fixture('versions')
332 @update_partial('_add', '_bad_request')
333 def test_addVersions_bad_schema(self, scenario):
334 add = models.ScenarioVersion(version='euphrates').format()
335 add['notexist'] = None
336 scenario['installers'][0]['versions'].append(add)
337 return [add], scenario
339 @update_url_fixture('versions')
340 @update_partial('_update', '_success')
341 def test_updateVersions_succ(self, scenario):
342 update = models.ScenarioVersion(version='euphrates').format()
343 scenario['installers'][0]['versions'] = [update]
344 return [update], scenario
346 @update_url_fixture('versions')
347 @update_partial('_update', '_conflict')
348 def test_updateVersions_duplicated(self, scenario):
349 update1 = models.ScenarioVersion(version='euphrates').format()
350 update2 = models.ScenarioVersion(version='euphrates').format()
351 scenario['installers'][0]['versions'] = [update1, update2]
352 return [update1, update2], scenario
354 @update_url_fixture('versions')
355 @update_partial('_update', '_bad_request')
356 def test_updateVersions_bad_schema(self, scenario):
357 update = models.ScenarioVersion(version='euphrates').format()
358 update['not_owner'] = 'Iam'
359 scenario['installers'][0]['versions'] = [update]
360 return [update], scenario
362 @update_url_fixture('versions')
363 @update_partial('_delete', '_success')
364 def test_deleteVersions(self, scenario):
366 versions = scenario['installers'][0]['versions']
367 scenario['installers'][0]['versions'] = filter(
368 lambda f: f['version'] != 'master',
370 return deletes, scenario
372 def _add(self, update_req, new_scenario):
373 return self.post_direct_url(self.update_url, update_req)
375 def _update(self, update_req, new_scenario):
376 return self.update_direct_url(self.update_url, update_req)
378 def _delete(self, update_req, new_scenario):
379 return self.delete_direct_url(self.update_url, update_req)
381 def _success(self, status, new_scenario):
382 self.assertEqual(status, httplib.OK)
383 self._get_and_assert(new_scenario.get('name'), new_scenario)
385 def _forbidden(self, status, new_scenario):
386 self.assertEqual(status, httplib.FORBIDDEN)
388 def _bad_request(self, status, new_scenario):
389 self.assertEqual(status, httplib.BAD_REQUEST)
391 def _conflict(self, status, new_scenario):
392 self.assertEqual(status, httplib.CONFLICT)