+
+
+class TestScenarioGet(TestScenarioBase):
+ def setUp(self):
+ super(TestScenarioGet, self).setUp()
+ self.scenario_1 = self.create_return_name(self.req_d)
+ self.scenario_2 = self.create_return_name(self.req_2)
+
+ def test_getByName(self):
+ self._get_and_assert(self.scenario_1, self.req_d)
+
+ def test_getAll(self):
+ self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
+
+ def test_queryName(self):
+ query = self._set_query('name=nosdn-nofeature-ha')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryInstaller(self):
+ query = self._set_query('installer=apex')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryVersion(self):
+ query = self._set_query('version=master')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryProject(self):
+ query = self._set_query('project=functest')
+ self._query_and_assert(query, reqs=[self.req_d, self.req_2])
+
+ def test_queryCombination(self):
+ query = self._set_query('name=nosdn-nofeature-ha',
+ 'installer=apex',
+ 'version=master',
+ 'project=functest')
+
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def _query_and_assert(self, query, found=True, reqs=None):
+ code, body = self.query(query)
+ if not found:
+ self.assertEqual(code, constants.HTTP_OK)
+ self.assertEqual(0, len(body.scenarios))
+ else:
+ self.assertEqual(len(reqs), len(body.scenarios))
+ for req in reqs:
+ for scenario in body.scenarios:
+ if req['name'] == scenario.name:
+ self.assert_res(code, scenario, req)
+
+
+class TestScenarioUpdate(TestScenarioBase):
+ def setUp(self):
+ super(TestScenarioUpdate, self).setUp()
+ self.scenario = self.create_return_name(self.req_d)
+
+ def _execute(set_update):
+ def magic(self):
+ update, scenario = set_update(self, deepcopy(self.req_d))
+ self._update_and_assert(update, scenario)
+ return magic
+
+ def test_renameScenario(self):
+ new_name = 'nosdn-nofeature-noha'
+ new_scenario = deepcopy(self.req_d)
+ new_scenario['name'] = new_name
+ update_req = models.ScenarioUpdateRequest(field='name',
+ op='update',
+ locate={},
+ term={'name': new_name})
+ self._update_and_assert(update_req, new_scenario, new_name)
+
+ @_execute
+ def test_addInstaller(self, scenario):
+ add = models.ScenarioInstaller(installer='daisy', versions=list())
+ scenario['installers'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='installer',
+ op='add',
+ locate={},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_deleteInstaller(self, scenario):
+ scenario['installers'] = filter(lambda f: f['installer'] != 'apex',
+ scenario['installers'])
+
+ update = models.ScenarioUpdateRequest(field='installer',
+ op='delete',
+ locate={'installer': 'apex'})
+ return update, scenario
+
+ @_execute
+ def test_addVersion(self, scenario):
+ add = models.ScenarioVersion(version='danube', projects=list())
+ scenario['installers'][0]['versions'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='version',
+ op='add',
+ locate={'installer': 'apex'},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_deleteVersion(self, scenario):
+ scenario['installers'][0]['versions'] = filter(
+ lambda f: f['version'] != 'master',
+ scenario['installers'][0]['versions'])
+
+ update = models.ScenarioUpdateRequest(field='version',
+ op='delete',
+ locate={'installer': 'apex',
+ 'version': 'master'})
+ return update, scenario
+
+ @_execute
+ def test_changeOwner(self, scenario):
+ scenario['installers'][0]['versions'][0]['owner'] = 'lucy'
+
+ update = models.ScenarioUpdateRequest(field='owner',
+ op='update',
+ locate={'installer': 'apex',
+ 'version': 'master'},
+ term={'owner': 'lucy'})
+ return update, scenario
+
+ @_execute
+ def test_addProject(self, scenario):
+ add = models.ScenarioProject(project='qtip').format()
+ scenario['installers'][0]['versions'][0]['projects'].append(add)
+ update = models.ScenarioUpdateRequest(field='project',
+ op='add',
+ locate={'installer': 'apex',
+ 'version': 'master'},
+ term=add)
+ return update, scenario
+
+ @_execute
+ def test_deleteProject(self, scenario):
+ scenario['installers'][0]['versions'][0]['projects'] = filter(
+ lambda f: f['project'] != 'functest',
+ scenario['installers'][0]['versions'][0]['projects'])
+
+ update = models.ScenarioUpdateRequest(field='project',
+ op='delete',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'})
+ return update, scenario
+
+ @_execute
+ def test_addCustoms(self, scenario):
+ add = ['odl', 'parser', 'vping_ssh']
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh']
+ update = models.ScenarioUpdateRequest(field='customs',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add)
+ return update, scenario
+
+ @_execute
+ def test_deleteCustoms(self, scenario):
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['customs'] = ['healthcheck']
+ update = models.ScenarioUpdateRequest(field='customs',
+ op='delete',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=['vping_ssh'])
+ return update, scenario
+
+ @_execute
+ def test_addScore(self, scenario):
+ add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['scores'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='score',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_addTi(self, scenario):
+ add = models.ScenarioTI(date=str(datetime.now()), status='gold')
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['trust_indicators'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='trust_indicator',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add.format())
+ return update, scenario
+
+ def _update_and_assert(self, update_req, new_scenario, name=None):
+ code, _ = self.update(update_req, self.scenario)
+ self.assertEqual(code, constants.HTTP_OK)
+ self._get_and_assert(self._none_default(name, self.scenario),
+ new_scenario)
+
+ @staticmethod
+ def _none_default(check, default):
+ return check if check else default
+
+
+class TestScenarioDelete(TestScenarioBase):
+ def test_notFound(self):
+ code, body = self.delete('notFound')
+ self.assertEqual(code, constants.HTTP_NOT_FOUND)
+
+ def test_success(self):
+ scenario = self.create_return_name(self.req_d)
+ code, _ = self.delete(scenario)
+ self.assertEqual(code, constants.HTTP_OK)
+ code, _ = self.get(scenario)
+ self.assertEqual(code, constants.HTTP_NOT_FOUND)