X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=utils%2Ftest%2Ftestapi%2Fopnfv_testapi%2Ftests%2Funit%2Ftest_scenario.py;h=ab2c34b31fa7671dafd34db5e875f9a7c447a416;hb=6f84a256bbe92cf3f9f746ca0972abd9942bad43;hp=ff59795244bbddfe6c9c766093535da4c95f9076;hpb=8cb5166cee8f446b9759d27e77192a19f94bdddc;p=releng.git diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py index ff5979524..ab2c34b31 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py @@ -1,20 +1,19 @@ +from copy import deepcopy +from datetime import datetime +import functools +import httplib import json import os -from opnfv_testapi.common.constants import HTTP_BAD_REQUEST -from opnfv_testapi.common.constants import HTTP_FORBIDDEN -from opnfv_testapi.common.constants import HTTP_OK -from opnfv_testapi.resources.scenario_models import Scenario -from opnfv_testapi.resources.scenario_models import ScenarioCreateRequest -from opnfv_testapi.resources.scenario_models import Scenarios -from test_testcase import TestBase +import opnfv_testapi.resources.scenario_models as models +import test_base as base -class TestScenarioBase(TestBase): +class TestScenarioBase(base.TestBase): def setUp(self): super(TestScenarioBase, self).setUp() - self.get_res = Scenario - self.list_res = Scenarios + self.get_res = models.Scenario + self.list_res = models.Scenarios self.basePath = '/api/v1/scenarios' self.req_d = self._load_request('scenario-c1.json') self.req_2 = self._load_request('scenario-c2.json') @@ -38,41 +37,52 @@ class TestScenarioBase(TestBase): return res.href.split('/')[-1] def assert_res(self, code, scenario, req=None): - self.assertEqual(code, HTTP_OK) + self.assertEqual(code, httplib.OK) if req is None: req = self.req_d - scenario_dict = scenario.format_http() - self.assertIsNotNone(scenario_dict['_id']) - self.assertIsNotNone(scenario_dict['creation_date']) - self.assertDictContainsSubset(req, scenario_dict) + self.assertIsNotNone(scenario._id) + self.assertIsNotNone(scenario.creation_date) + + scenario == models.Scenario.from_dict(req) + + @staticmethod + def _set_query(*args): + uri = '' + for arg in args: + uri += arg + '&' + return uri[0: -1] + + def _get_and_assert(self, name, req=None): + code, body = self.get(name) + self.assert_res(code, body, req) class TestScenarioCreate(TestScenarioBase): def test_withoutBody(self): (code, body) = self.create() - self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertEqual(code, httplib.BAD_REQUEST) def test_emptyName(self): - req_empty = ScenarioCreateRequest('') + req_empty = models.ScenarioCreateRequest('') (code, body) = self.create(req_empty) - self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertEqual(code, httplib.BAD_REQUEST) self.assertIn('name missing', body) def test_noneName(self): - req_none = ScenarioCreateRequest(None) + req_none = models.ScenarioCreateRequest(None) (code, body) = self.create(req_none) - self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertEqual(code, httplib.BAD_REQUEST) self.assertIn('name missing', body) def test_success(self): (code, body) = self.create_d() - self.assertEqual(code, HTTP_OK) + self.assertEqual(code, httplib.OK) self.assert_create_body(body) def test_alreadyExist(self): self.create_d() (code, body) = self.create_d() - self.assertEqual(code, HTTP_FORBIDDEN) + self.assertEqual(code, httplib.FORBIDDEN) self.assertIn('already exists', body) @@ -83,8 +93,7 @@ class TestScenarioGet(TestScenarioBase): self.scenario_2 = self.create_return_name(self.req_2) def test_getByName(self): - code, body = self.get(self.scenario_1) - self.assert_res(code, body, req=self.req_d) + self._get_and_assert(self.scenario_1, self.req_d) def test_getAll(self): self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) @@ -113,17 +122,10 @@ class TestScenarioGet(TestScenarioBase): self._query_and_assert(query, reqs=[self.req_d]) - @staticmethod - def _set_query(*args): - uri = '' - for arg in args: - uri += arg + '&' - return uri[0: -1] - def _query_and_assert(self, query, found=True, reqs=None): code, body = self.query(query) if not found: - self.assertEqual(code, HTTP_OK) + self.assertEqual(code, httplib.OK) self.assertEqual(0, len(body.scenarios)) else: self.assertEqual(len(reqs), len(body.scenarios)) @@ -131,3 +133,227 @@ class TestScenarioGet(TestScenarioBase): for scenario in body.scenarios: if req['name'] == scenario.name: self.assert_res(code, scenario, req) + + +class TestScenarioUpdate(TestScenarioBase): + def setUp(self): + super(TestScenarioUpdate, self).setUp() + self.scenario = self.create_return_name(self.req_d) + self.scenario_2 = self.create_return_name(self.req_2) + + def _execute(set_update): + @functools.wraps(set_update) + def magic(self): + update, scenario = set_update(self, deepcopy(self.req_d)) + self._update_and_assert(update, scenario) + return magic + + def _update(expected): + def _update(set_update): + @functools.wraps(set_update) + def wrap(self): + update, scenario = set_update(self, deepcopy(self.req_d)) + code, body = self.update(update, self.scenario) + getattr(self, expected)(code, scenario) + return wrap + return _update + + @_update('_success') + def test_renameScenario(self, scenario): + new_name = 'nosdn-nofeature-noha' + scenario['name'] = new_name + update_req = models.ScenarioUpdateRequest(field='name', + op='update', + locate={}, + term={'name': new_name}) + return update_req, scenario + + @_update('_forbidden') + def test_renameScenario_exist(self, scenario): + new_name = self.scenario_2 + scenario['name'] = new_name + update_req = models.ScenarioUpdateRequest(field='name', + op='update', + locate={}, + term={'name': new_name}) + return update_req, scenario + + @_update('_bad_request') + def test_renameScenario_noName(self, scenario): + new_name = self.scenario_2 + scenario['name'] = new_name + update_req = models.ScenarioUpdateRequest(field='name', + op='update', + locate={}, + term={}) + return update_req, scenario + + @_execute + def test_addInstaller(self, scenario): + add = models.ScenarioInstaller(installer='daisy', versions=list()) + scenario['installers'].append(add.format()) + update = models.ScenarioUpdateRequest(field='installer', + op='add', + locate={}, + term=add.format()) + return update, scenario + + @_execute + def test_deleteInstaller(self, scenario): + scenario['installers'] = filter(lambda f: f['installer'] != 'apex', + scenario['installers']) + + update = models.ScenarioUpdateRequest(field='installer', + op='delete', + locate={'installer': 'apex'}) + return update, scenario + + @_execute + def test_addVersion(self, scenario): + add = models.ScenarioVersion(version='danube', projects=list()) + scenario['installers'][0]['versions'].append(add.format()) + update = models.ScenarioUpdateRequest(field='version', + op='add', + locate={'installer': 'apex'}, + term=add.format()) + return update, scenario + + @_execute + def test_deleteVersion(self, scenario): + scenario['installers'][0]['versions'] = filter( + lambda f: f['version'] != 'master', + scenario['installers'][0]['versions']) + + update = models.ScenarioUpdateRequest(field='version', + op='delete', + locate={'installer': 'apex', + 'version': 'master'}) + return update, scenario + + @_execute + def test_changeOwner(self, scenario): + scenario['installers'][0]['versions'][0]['owner'] = 'lucy' + + update = models.ScenarioUpdateRequest(field='owner', + op='update', + locate={'installer': 'apex', + 'version': 'master'}, + term={'owner': 'lucy'}) + return update, scenario + + @_execute + def test_addProject(self, scenario): + add = models.ScenarioProject(project='qtip').format() + scenario['installers'][0]['versions'][0]['projects'].append(add) + update = models.ScenarioUpdateRequest(field='project', + op='add', + locate={'installer': 'apex', + 'version': 'master'}, + term=add) + return update, scenario + + @_execute + def test_deleteProject(self, scenario): + scenario['installers'][0]['versions'][0]['projects'] = filter( + lambda f: f['project'] != 'functest', + scenario['installers'][0]['versions'][0]['projects']) + + update = models.ScenarioUpdateRequest(field='project', + op='delete', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}) + return update, scenario + + @_execute + def test_addCustoms(self, scenario): + add = ['odl', 'parser', 'vping_ssh'] + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh'] + update = models.ScenarioUpdateRequest(field='customs', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add) + return update, scenario + + @_execute + def test_deleteCustoms(self, scenario): + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck'] + update = models.ScenarioUpdateRequest(field='customs', + op='delete', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=['vping_ssh']) + return update, scenario + + @_execute + def test_addScore(self, scenario): + add = models.ScenarioScore(date=str(datetime.now()), score='11/12') + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['scores'].append(add.format()) + update = models.ScenarioUpdateRequest(field='score', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add.format()) + return update, scenario + + @_execute + def test_addTi(self, scenario): + add = models.ScenarioTI(date=str(datetime.now()), status='gold') + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['trust_indicators'].append(add.format()) + update = models.ScenarioUpdateRequest(field='trust_indicator', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add.format()) + return update, scenario + + def _update_and_assert(self, update_req, new_scenario, name=None): + code, _ = self.update(update_req, self.scenario) + self.assertEqual(code, httplib.OK) + self._get_and_assert(_none_default(name, self.scenario), + new_scenario) + + def _success(self, status, new_scenario): + self.assertEqual(status, httplib.OK) + self._get_and_assert(new_scenario.get('name'), new_scenario) + + def _forbidden(self, status, new_scenario): + self.assertEqual(status, httplib.FORBIDDEN) + + def _bad_request(self, status, new_scenario): + self.assertEqual(status, httplib.BAD_REQUEST) + + +class TestScenarioDelete(TestScenarioBase): + def test_notFound(self): + code, body = self.delete('notFound') + self.assertEqual(code, httplib.NOT_FOUND) + + def test_success(self): + scenario = self.create_return_name(self.req_d) + code, _ = self.delete(scenario) + self.assertEqual(code, httplib.OK) + code, _ = self.get(scenario) + self.assertEqual(code, httplib.NOT_FOUND) + + +def _none_default(check, default): + return check if check else default