X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=utils%2Ftest%2Ftestapi%2Fopnfv_testapi%2Ftests%2Funit%2Ftest_scenario.py;h=a5a4fb30fa3b07abd7e8ef57590c879c4d7448e3;hb=f36fc889cfc92f50a4f053661f9ea841e4a5b045;hp=ff59795244bbddfe6c9c766093535da4c95f9076;hpb=04156d074c1d83e86e1c6d7c15dd077b17c86dab;p=releng.git diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py index ff5979524..a5a4fb30f 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py @@ -1,20 +1,19 @@ +from copy import deepcopy +from datetime import datetime +import functools import json import os -from opnfv_testapi.common.constants import HTTP_BAD_REQUEST -from opnfv_testapi.common.constants import HTTP_FORBIDDEN -from opnfv_testapi.common.constants import HTTP_OK -from opnfv_testapi.resources.scenario_models import Scenario -from opnfv_testapi.resources.scenario_models import ScenarioCreateRequest -from opnfv_testapi.resources.scenario_models import Scenarios -from test_testcase import TestBase +from opnfv_testapi.common import constants +import opnfv_testapi.resources.scenario_models as models +import test_base as base -class TestScenarioBase(TestBase): +class TestScenarioBase(base.TestBase): def setUp(self): super(TestScenarioBase, self).setUp() - self.get_res = Scenario - self.list_res = Scenarios + self.get_res = models.Scenario + self.list_res = models.Scenarios self.basePath = '/api/v1/scenarios' self.req_d = self._load_request('scenario-c1.json') self.req_2 = self._load_request('scenario-c2.json') @@ -38,41 +37,52 @@ class TestScenarioBase(TestBase): return res.href.split('/')[-1] def assert_res(self, code, scenario, req=None): - self.assertEqual(code, HTTP_OK) + self.assertEqual(code, constants.HTTP_OK) if req is None: req = self.req_d - scenario_dict = scenario.format_http() - self.assertIsNotNone(scenario_dict['_id']) - self.assertIsNotNone(scenario_dict['creation_date']) - self.assertDictContainsSubset(req, scenario_dict) + self.assertIsNotNone(scenario._id) + self.assertIsNotNone(scenario.creation_date) + + scenario == models.Scenario.from_dict(req) + + @staticmethod + def _set_query(*args): + uri = '' + for arg in args: + uri += arg + '&' + return uri[0: -1] + + def _get_and_assert(self, name, req=None): + code, body = self.get(name) + self.assert_res(code, body, req) class TestScenarioCreate(TestScenarioBase): def test_withoutBody(self): (code, body) = self.create() - self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertEqual(code, constants.HTTP_BAD_REQUEST) def test_emptyName(self): - req_empty = ScenarioCreateRequest('') + req_empty = models.ScenarioCreateRequest('') (code, body) = self.create(req_empty) - self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertEqual(code, constants.HTTP_BAD_REQUEST) self.assertIn('name missing', body) def test_noneName(self): - req_none = ScenarioCreateRequest(None) + req_none = models.ScenarioCreateRequest(None) (code, body) = self.create(req_none) - self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertEqual(code, constants.HTTP_BAD_REQUEST) self.assertIn('name missing', body) def test_success(self): (code, body) = self.create_d() - self.assertEqual(code, HTTP_OK) + self.assertEqual(code, constants.HTTP_OK) self.assert_create_body(body) def test_alreadyExist(self): self.create_d() (code, body) = self.create_d() - self.assertEqual(code, HTTP_FORBIDDEN) + self.assertEqual(code, constants.HTTP_FORBIDDEN) self.assertIn('already exists', body) @@ -83,8 +93,7 @@ class TestScenarioGet(TestScenarioBase): self.scenario_2 = self.create_return_name(self.req_2) def test_getByName(self): - code, body = self.get(self.scenario_1) - self.assert_res(code, body, req=self.req_d) + self._get_and_assert(self.scenario_1, self.req_d) def test_getAll(self): self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) @@ -113,17 +122,10 @@ class TestScenarioGet(TestScenarioBase): self._query_and_assert(query, reqs=[self.req_d]) - @staticmethod - def _set_query(*args): - uri = '' - for arg in args: - uri += arg + '&' - return uri[0: -1] - def _query_and_assert(self, query, found=True, reqs=None): code, body = self.query(query) if not found: - self.assertEqual(code, HTTP_OK) + self.assertEqual(code, constants.HTTP_OK) self.assertEqual(0, len(body.scenarios)) else: self.assertEqual(len(reqs), len(body.scenarios)) @@ -131,3 +133,186 @@ class TestScenarioGet(TestScenarioBase): for scenario in body.scenarios: if req['name'] == scenario.name: self.assert_res(code, scenario, req) + + +class TestScenarioUpdate(TestScenarioBase): + def setUp(self): + super(TestScenarioUpdate, self).setUp() + self.scenario = self.create_return_name(self.req_d) + + def _execute(set_update): + @functools.wraps(set_update) + def magic(self): + update, scenario = set_update(self, deepcopy(self.req_d)) + self._update_and_assert(update, scenario) + return magic + + def test_renameScenario(self): + new_name = 'nosdn-nofeature-noha' + new_scenario = deepcopy(self.req_d) + new_scenario['name'] = new_name + update_req = models.ScenarioUpdateRequest(field='name', + op='update', + locate={}, + term={'name': new_name}) + self._update_and_assert(update_req, new_scenario, new_name) + + @_execute + def test_addInstaller(self, scenario): + add = models.ScenarioInstaller(installer='daisy', versions=list()) + scenario['installers'].append(add.format()) + update = models.ScenarioUpdateRequest(field='installer', + op='add', + locate={}, + term=add.format()) + return update, scenario + + @_execute + def test_deleteInstaller(self, scenario): + scenario['installers'] = filter(lambda f: f['installer'] != 'apex', + scenario['installers']) + + update = models.ScenarioUpdateRequest(field='installer', + op='delete', + locate={'installer': 'apex'}) + return update, scenario + + @_execute + def test_addVersion(self, scenario): + add = models.ScenarioVersion(version='danube', projects=list()) + scenario['installers'][0]['versions'].append(add.format()) + update = models.ScenarioUpdateRequest(field='version', + op='add', + locate={'installer': 'apex'}, + term=add.format()) + return update, scenario + + @_execute + def test_deleteVersion(self, scenario): + scenario['installers'][0]['versions'] = filter( + lambda f: f['version'] != 'master', + scenario['installers'][0]['versions']) + + update = models.ScenarioUpdateRequest(field='version', + op='delete', + locate={'installer': 'apex', + 'version': 'master'}) + return update, scenario + + @_execute + def test_changeOwner(self, scenario): + scenario['installers'][0]['versions'][0]['owner'] = 'lucy' + + update = models.ScenarioUpdateRequest(field='owner', + op='update', + locate={'installer': 'apex', + 'version': 'master'}, + term={'owner': 'lucy'}) + return update, scenario + + @_execute + def test_addProject(self, scenario): + add = models.ScenarioProject(project='qtip').format() + scenario['installers'][0]['versions'][0]['projects'].append(add) + update = models.ScenarioUpdateRequest(field='project', + op='add', + locate={'installer': 'apex', + 'version': 'master'}, + term=add) + return update, scenario + + @_execute + def test_deleteProject(self, scenario): + scenario['installers'][0]['versions'][0]['projects'] = filter( + lambda f: f['project'] != 'functest', + scenario['installers'][0]['versions'][0]['projects']) + + update = models.ScenarioUpdateRequest(field='project', + op='delete', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}) + return update, scenario + + @_execute + def test_addCustoms(self, scenario): + add = ['odl', 'parser', 'vping_ssh'] + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh'] + update = models.ScenarioUpdateRequest(field='customs', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add) + return update, scenario + + @_execute + def test_deleteCustoms(self, scenario): + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck'] + update = models.ScenarioUpdateRequest(field='customs', + op='delete', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=['vping_ssh']) + return update, scenario + + @_execute + def test_addScore(self, scenario): + add = models.ScenarioScore(date=str(datetime.now()), score='11/12') + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['scores'].append(add.format()) + update = models.ScenarioUpdateRequest(field='score', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add.format()) + return update, scenario + + @_execute + def test_addTi(self, scenario): + add = models.ScenarioTI(date=str(datetime.now()), status='gold') + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['trust_indicators'].append(add.format()) + update = models.ScenarioUpdateRequest(field='trust_indicator', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add.format()) + return update, scenario + + def _update_and_assert(self, update_req, new_scenario, name=None): + code, _ = self.update(update_req, self.scenario) + self.assertEqual(code, constants.HTTP_OK) + self._get_and_assert(self._none_default(name, self.scenario), + new_scenario) + + @staticmethod + def _none_default(check, default): + return check if check else default + + +class TestScenarioDelete(TestScenarioBase): + def test_notFound(self): + code, body = self.delete('notFound') + self.assertEqual(code, constants.HTTP_NOT_FOUND) + + def test_success(self): + scenario = self.create_return_name(self.req_d) + code, _ = self.delete(scenario) + self.assertEqual(code, constants.HTTP_OK) + code, _ = self.get(scenario) + self.assertEqual(code, constants.HTTP_NOT_FOUND)