X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=utils%2Ftest%2Ftestapi%2Fopnfv_testapi%2Ftests%2Funit%2Ftest_scenario.py;h=a5a4fb30fa3b07abd7e8ef57590c879c4d7448e3;hb=f36fc889cfc92f50a4f053661f9ea841e4a5b045;hp=8e827813ce9a090c0ded4d666cde8c117cb431a9;hpb=6d80e4d8e3c6d5ccfa2c186786c0dc7e44c5ddbc;p=releng.git diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py index 8e827813c..a5a4fb30f 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py @@ -1,18 +1,22 @@ +from copy import deepcopy +from datetime import datetime +import functools import json import os -from opnfv_testapi.common.constants import HTTP_BAD_REQUEST -from opnfv_testapi.common.constants import HTTP_FORBIDDEN -from opnfv_testapi.common.constants import HTTP_OK -from opnfv_testapi.resources.scenario_models import ScenarioCreateRequest -from test_testcase import TestBase +from opnfv_testapi.common import constants +import opnfv_testapi.resources.scenario_models as models +import test_base as base -class TestScenarioBase(TestBase): +class TestScenarioBase(base.TestBase): def setUp(self): super(TestScenarioBase, self).setUp() + self.get_res = models.Scenario + self.list_res = models.Scenarios self.basePath = '/api/v1/scenarios' - self.load_request('scenario-create.json') + self.req_d = self._load_request('scenario-c1.json') + self.req_2 = self._load_request('scenario-c2.json') def tearDown(self): pass @@ -20,36 +24,295 @@ class TestScenarioBase(TestBase): def assert_body(self, project, req=None): pass - def load_request(self, f_req): - with open(os.path.join(os.path.dirname(__file__), f_req), 'r') as f: - self.req_d = json.dumps(json.load(f)) + @staticmethod + def _load_request(f_req): + abs_file = os.path.join(os.path.dirname(__file__), f_req) + with open(abs_file, 'r') as f: + loader = json.load(f) f.close() + return loader + + def create_return_name(self, req): + _, res = self.create(req) + return res.href.split('/')[-1] + + def assert_res(self, code, scenario, req=None): + self.assertEqual(code, constants.HTTP_OK) + if req is None: + req = self.req_d + self.assertIsNotNone(scenario._id) + self.assertIsNotNone(scenario.creation_date) + + scenario == models.Scenario.from_dict(req) + + @staticmethod + def _set_query(*args): + uri = '' + for arg in args: + uri += arg + '&' + return uri[0: -1] + + def _get_and_assert(self, name, req=None): + code, body = self.get(name) + self.assert_res(code, body, req) class TestScenarioCreate(TestScenarioBase): def test_withoutBody(self): (code, body) = self.create() - self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertEqual(code, constants.HTTP_BAD_REQUEST) def test_emptyName(self): - req_empty = ScenarioCreateRequest('') + req_empty = models.ScenarioCreateRequest('') (code, body) = self.create(req_empty) - self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertEqual(code, constants.HTTP_BAD_REQUEST) self.assertIn('name missing', body) def test_noneName(self): - req_none = ScenarioCreateRequest(None) + req_none = models.ScenarioCreateRequest(None) (code, body) = self.create(req_none) - self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertEqual(code, constants.HTTP_BAD_REQUEST) self.assertIn('name missing', body) def test_success(self): (code, body) = self.create_d() - self.assertEqual(code, HTTP_OK) + self.assertEqual(code, constants.HTTP_OK) self.assert_create_body(body) def test_alreadyExist(self): self.create_d() (code, body) = self.create_d() - self.assertEqual(code, HTTP_FORBIDDEN) + self.assertEqual(code, constants.HTTP_FORBIDDEN) self.assertIn('already exists', body) + + +class TestScenarioGet(TestScenarioBase): + def setUp(self): + super(TestScenarioGet, self).setUp() + self.scenario_1 = self.create_return_name(self.req_d) + self.scenario_2 = self.create_return_name(self.req_2) + + def test_getByName(self): + self._get_and_assert(self.scenario_1, self.req_d) + + def test_getAll(self): + self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) + + def test_queryName(self): + query = self._set_query('name=nosdn-nofeature-ha') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryInstaller(self): + query = self._set_query('installer=apex') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryVersion(self): + query = self._set_query('version=master') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryProject(self): + query = self._set_query('project=functest') + self._query_and_assert(query, reqs=[self.req_d, self.req_2]) + + def test_queryCombination(self): + query = self._set_query('name=nosdn-nofeature-ha', + 'installer=apex', + 'version=master', + 'project=functest') + + self._query_and_assert(query, reqs=[self.req_d]) + + def _query_and_assert(self, query, found=True, reqs=None): + code, body = self.query(query) + if not found: + self.assertEqual(code, constants.HTTP_OK) + self.assertEqual(0, len(body.scenarios)) + else: + self.assertEqual(len(reqs), len(body.scenarios)) + for req in reqs: + for scenario in body.scenarios: + if req['name'] == scenario.name: + self.assert_res(code, scenario, req) + + +class TestScenarioUpdate(TestScenarioBase): + def setUp(self): + super(TestScenarioUpdate, self).setUp() + self.scenario = self.create_return_name(self.req_d) + + def _execute(set_update): + @functools.wraps(set_update) + def magic(self): + update, scenario = set_update(self, deepcopy(self.req_d)) + self._update_and_assert(update, scenario) + return magic + + def test_renameScenario(self): + new_name = 'nosdn-nofeature-noha' + new_scenario = deepcopy(self.req_d) + new_scenario['name'] = new_name + update_req = models.ScenarioUpdateRequest(field='name', + op='update', + locate={}, + term={'name': new_name}) + self._update_and_assert(update_req, new_scenario, new_name) + + @_execute + def test_addInstaller(self, scenario): + add = models.ScenarioInstaller(installer='daisy', versions=list()) + scenario['installers'].append(add.format()) + update = models.ScenarioUpdateRequest(field='installer', + op='add', + locate={}, + term=add.format()) + return update, scenario + + @_execute + def test_deleteInstaller(self, scenario): + scenario['installers'] = filter(lambda f: f['installer'] != 'apex', + scenario['installers']) + + update = models.ScenarioUpdateRequest(field='installer', + op='delete', + locate={'installer': 'apex'}) + return update, scenario + + @_execute + def test_addVersion(self, scenario): + add = models.ScenarioVersion(version='danube', projects=list()) + scenario['installers'][0]['versions'].append(add.format()) + update = models.ScenarioUpdateRequest(field='version', + op='add', + locate={'installer': 'apex'}, + term=add.format()) + return update, scenario + + @_execute + def test_deleteVersion(self, scenario): + scenario['installers'][0]['versions'] = filter( + lambda f: f['version'] != 'master', + scenario['installers'][0]['versions']) + + update = models.ScenarioUpdateRequest(field='version', + op='delete', + locate={'installer': 'apex', + 'version': 'master'}) + return update, scenario + + @_execute + def test_changeOwner(self, scenario): + scenario['installers'][0]['versions'][0]['owner'] = 'lucy' + + update = models.ScenarioUpdateRequest(field='owner', + op='update', + locate={'installer': 'apex', + 'version': 'master'}, + term={'owner': 'lucy'}) + return update, scenario + + @_execute + def test_addProject(self, scenario): + add = models.ScenarioProject(project='qtip').format() + scenario['installers'][0]['versions'][0]['projects'].append(add) + update = models.ScenarioUpdateRequest(field='project', + op='add', + locate={'installer': 'apex', + 'version': 'master'}, + term=add) + return update, scenario + + @_execute + def test_deleteProject(self, scenario): + scenario['installers'][0]['versions'][0]['projects'] = filter( + lambda f: f['project'] != 'functest', + scenario['installers'][0]['versions'][0]['projects']) + + update = models.ScenarioUpdateRequest(field='project', + op='delete', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}) + return update, scenario + + @_execute + def test_addCustoms(self, scenario): + add = ['odl', 'parser', 'vping_ssh'] + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh'] + update = models.ScenarioUpdateRequest(field='customs', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add) + return update, scenario + + @_execute + def test_deleteCustoms(self, scenario): + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck'] + update = models.ScenarioUpdateRequest(field='customs', + op='delete', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=['vping_ssh']) + return update, scenario + + @_execute + def test_addScore(self, scenario): + add = models.ScenarioScore(date=str(datetime.now()), score='11/12') + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['scores'].append(add.format()) + update = models.ScenarioUpdateRequest(field='score', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add.format()) + return update, scenario + + @_execute + def test_addTi(self, scenario): + add = models.ScenarioTI(date=str(datetime.now()), status='gold') + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['trust_indicators'].append(add.format()) + update = models.ScenarioUpdateRequest(field='trust_indicator', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add.format()) + return update, scenario + + def _update_and_assert(self, update_req, new_scenario, name=None): + code, _ = self.update(update_req, self.scenario) + self.assertEqual(code, constants.HTTP_OK) + self._get_and_assert(self._none_default(name, self.scenario), + new_scenario) + + @staticmethod + def _none_default(check, default): + return check if check else default + + +class TestScenarioDelete(TestScenarioBase): + def test_notFound(self): + code, body = self.delete('notFound') + self.assertEqual(code, constants.HTTP_NOT_FOUND) + + def test_success(self): + scenario = self.create_return_name(self.req_d) + code, _ = self.delete(scenario) + self.assertEqual(code, constants.HTTP_OK) + code, _ = self.get(scenario) + self.assertEqual(code, constants.HTTP_NOT_FOUND)