X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=utils%2Ftest%2Ftestapi%2Fopnfv_testapi%2Ftests%2Funit%2Ftest_scenario.py;h=c15dc32ea6508e1f0bb8d486a36c1e169bd9d55e;hb=f262b532623812aa4002e44bfcce7d392a8f156b;hp=8e827813ce9a090c0ded4d666cde8c117cb431a9;hpb=a83e598c5401531d8f4010f16a7c41461fa4e828;p=releng.git diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py index 8e827813c..c15dc32ea 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py @@ -1,18 +1,23 @@ +from copy import deepcopy import json import os +from datetime import datetime from opnfv_testapi.common.constants import HTTP_BAD_REQUEST from opnfv_testapi.common.constants import HTTP_FORBIDDEN from opnfv_testapi.common.constants import HTTP_OK -from opnfv_testapi.resources.scenario_models import ScenarioCreateRequest +import opnfv_testapi.resources.scenario_models as models from test_testcase import TestBase class TestScenarioBase(TestBase): def setUp(self): super(TestScenarioBase, self).setUp() + self.get_res = models.Scenario + self.list_res = models.Scenarios self.basePath = '/api/v1/scenarios' - self.load_request('scenario-create.json') + self.req_d = self._load_request('scenario-c1.json') + self.req_2 = self._load_request('scenario-c2.json') def tearDown(self): pass @@ -20,10 +25,37 @@ class TestScenarioBase(TestBase): def assert_body(self, project, req=None): pass - def load_request(self, f_req): - with open(os.path.join(os.path.dirname(__file__), f_req), 'r') as f: - self.req_d = json.dumps(json.load(f)) + @staticmethod + def _load_request(f_req): + abs_file = os.path.join(os.path.dirname(__file__), f_req) + with open(abs_file, 'r') as f: + loader = json.load(f) f.close() + return loader + + def create_return_name(self, req): + _, res = self.create(req) + return res.href.split('/')[-1] + + def assert_res(self, code, scenario, req=None): + self.assertEqual(code, HTTP_OK) + if req is None: + req = self.req_d + scenario_dict = scenario.format_http() + self.assertIsNotNone(scenario_dict['_id']) + self.assertIsNotNone(scenario_dict['creation_date']) + self.assertDictContainsSubset(req, scenario_dict) + + @staticmethod + def _set_query(*args): + uri = '' + for arg in args: + uri += arg + '&' + return uri[0: -1] + + def _get_and_assert(self, name, req=None): + code, body = self.get(name) + self.assert_res(code, body, req) class TestScenarioCreate(TestScenarioBase): @@ -32,13 +64,13 @@ class TestScenarioCreate(TestScenarioBase): self.assertEqual(code, HTTP_BAD_REQUEST) def test_emptyName(self): - req_empty = ScenarioCreateRequest('') + req_empty = models.ScenarioCreateRequest('') (code, body) = self.create(req_empty) self.assertEqual(code, HTTP_BAD_REQUEST) self.assertIn('name missing', body) def test_noneName(self): - req_none = ScenarioCreateRequest(None) + req_none = models.ScenarioCreateRequest(None) (code, body) = self.create(req_none) self.assertEqual(code, HTTP_BAD_REQUEST) self.assertIn('name missing', body) @@ -53,3 +85,221 @@ class TestScenarioCreate(TestScenarioBase): (code, body) = self.create_d() self.assertEqual(code, HTTP_FORBIDDEN) self.assertIn('already exists', body) + + +class TestScenarioGet(TestScenarioBase): + def setUp(self): + super(TestScenarioGet, self).setUp() + self.scenario_1 = self.create_return_name(self.req_d) + self.scenario_2 = self.create_return_name(self.req_2) + + def test_getByName(self): + self._get_and_assert(self.scenario_1, self.req_d) + + def test_getAll(self): + self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) + + def test_queryName(self): + query = self._set_query('name=nosdn-nofeature-ha') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryInstaller(self): + query = self._set_query('installer=apex') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryVersion(self): + query = self._set_query('version=master') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryProject(self): + query = self._set_query('project=functest') + self._query_and_assert(query, reqs=[self.req_d, self.req_2]) + + def test_queryCombination(self): + query = self._set_query('name=nosdn-nofeature-ha', + 'installer=apex', + 'version=master', + 'project=functest') + + self._query_and_assert(query, reqs=[self.req_d]) + + def _query_and_assert(self, query, found=True, reqs=None): + code, body = self.query(query) + if not found: + self.assertEqual(code, HTTP_OK) + self.assertEqual(0, len(body.scenarios)) + else: + self.assertEqual(len(reqs), len(body.scenarios)) + for req in reqs: + for scenario in body.scenarios: + if req['name'] == scenario.name: + self.assert_res(code, scenario, req) + + +class TestScenarioUpdate(TestScenarioBase): + def setUp(self): + super(TestScenarioUpdate, self).setUp() + self.scenario = self.create_return_name(self.req_d) + + def _execute(set_update): + def magic(self): + update, scenario = set_update(self, deepcopy(self.req_d)) + self._update_and_assert(update, scenario) + return magic + + def test_renameScenario(self): + new_name = 'nosdn-nofeature-noha' + new_scenario = deepcopy(self.req_d) + new_scenario['name'] = new_name + update_req = models.ScenarioUpdateRequest(field='name', + op='update', + locate={}, + term={'name': new_name}) + self._update_and_assert(update_req, new_scenario, new_name) + + @_execute + def test_addInstaller(self, scenario): + add = models.ScenarioInstaller(installer='daisy', versions=list()) + scenario['installers'].append(add.format()) + update = models.ScenarioUpdateRequest(field='installer', + op='add', + locate={}, + term=add.format()) + return update, scenario + + @_execute + def test_deleteInstaller(self, scenario): + scenario['installers'] = filter(lambda f: f['installer'] != 'apex', + scenario['installers']) + + update = models.ScenarioUpdateRequest(field='installer', + op='delete', + locate={'installer': 'apex'}) + return update, scenario + + @_execute + def test_addVersion(self, scenario): + add = models.ScenarioVersion(version='danube', projects=list()) + scenario['installers'][0]['versions'].append(add.format()) + update = models.ScenarioUpdateRequest(field='version', + op='add', + locate={'installer': 'apex'}, + term=add.format()) + return update, scenario + + @_execute + def test_deleteVersion(self, scenario): + scenario['installers'][0]['versions'] = filter( + lambda f: f['version'] != 'master', + scenario['installers'][0]['versions']) + + update = models.ScenarioUpdateRequest(field='version', + op='delete', + locate={'installer': 'apex', + 'version': 'master'}) + return update, scenario + + @_execute + def test_changeOwner(self, scenario): + scenario['installers'][0]['versions'][0]['owner'] = 'lucy' + + update = models.ScenarioUpdateRequest(field='owner', + op='update', + locate={'installer': 'apex', + 'version': 'master'}, + term={'owner': 'lucy'}) + return update, scenario + + @_execute + def test_addProject(self, scenario): + add = models.ScenarioProject(project='qtip').format() + scenario['installers'][0]['versions'][0]['projects'].append(add) + update = models.ScenarioUpdateRequest(field='project', + op='add', + locate={'installer': 'apex', + 'version': 'master'}, + term=add) + return update, scenario + + @_execute + def test_deleteProject(self, scenario): + scenario['installers'][0]['versions'][0]['projects'] = filter( + lambda f: f['project'] != 'functest', + scenario['installers'][0]['versions'][0]['projects']) + + update = models.ScenarioUpdateRequest(field='project', + op='delete', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}) + return update, scenario + + @_execute + def test_addCustoms(self, scenario): + add = ['odl', 'parser', 'vping_ssh'] + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh'] + update = models.ScenarioUpdateRequest(field='customs', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add) + return update, scenario + + @_execute + def test_deleteCustoms(self, scenario): + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck'] + update = models.ScenarioUpdateRequest(field='customs', + op='delete', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=['vping_ssh']) + return update, scenario + + @_execute + def test_addScore(self, scenario): + add = models.ScenarioScore(date=str(datetime.now()), score='11/12') + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['scores'].append(add.format()) + update = models.ScenarioUpdateRequest(field='score', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add.format()) + return update, scenario + + @_execute + def test_addTi(self, scenario): + add = models.ScenarioTI(date=str(datetime.now()), status='gold') + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['trust_indicators'].append(add.format()) + update = models.ScenarioUpdateRequest(field='trust_indicator', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add.format()) + return update, scenario + + def _update_and_assert(self, update_req, new_scenario, name=None): + code, _ = self.update(update_req, self.scenario) + self.assertEqual(code, HTTP_OK) + self._get_and_assert(self._none_default(name, self.scenario), + new_scenario) + + @staticmethod + def _none_default(check, default): + return check if check else default