add rename scenario exceptions
[releng.git] / utils / test / testapi / opnfv_testapi / tests / unit / test_scenario.py
index ff59795..b62c1d2 100644 (file)
@@ -1,20 +1,19 @@
+from copy import deepcopy
+from datetime import datetime
+import functools
 import json
 import os
 
-from opnfv_testapi.common.constants import HTTP_BAD_REQUEST
-from opnfv_testapi.common.constants import HTTP_FORBIDDEN
-from opnfv_testapi.common.constants import HTTP_OK
-from opnfv_testapi.resources.scenario_models import Scenario
-from opnfv_testapi.resources.scenario_models import ScenarioCreateRequest
-from opnfv_testapi.resources.scenario_models import Scenarios
-from test_testcase import TestBase
+from opnfv_testapi.common import constants
+import opnfv_testapi.resources.scenario_models as models
+import test_base as base
 
 
-class TestScenarioBase(TestBase):
+class TestScenarioBase(base.TestBase):
     def setUp(self):
         super(TestScenarioBase, self).setUp()
-        self.get_res = Scenario
-        self.list_res = Scenarios
+        self.get_res = models.Scenario
+        self.list_res = models.Scenarios
         self.basePath = '/api/v1/scenarios'
         self.req_d = self._load_request('scenario-c1.json')
         self.req_2 = self._load_request('scenario-c2.json')
@@ -38,41 +37,52 @@ class TestScenarioBase(TestBase):
         return res.href.split('/')[-1]
 
     def assert_res(self, code, scenario, req=None):
-        self.assertEqual(code, HTTP_OK)
+        self.assertEqual(code, constants.HTTP_OK)
         if req is None:
             req = self.req_d
-        scenario_dict = scenario.format_http()
-        self.assertIsNotNone(scenario_dict['_id'])
-        self.assertIsNotNone(scenario_dict['creation_date'])
-        self.assertDictContainsSubset(req, scenario_dict)
+        self.assertIsNotNone(scenario._id)
+        self.assertIsNotNone(scenario.creation_date)
+
+        scenario == models.Scenario.from_dict(req)
+
+    @staticmethod
+    def _set_query(*args):
+        uri = ''
+        for arg in args:
+            uri += arg + '&'
+        return uri[0: -1]
+
+    def _get_and_assert(self, name, req=None):
+        code, body = self.get(name)
+        self.assert_res(code, body, req)
 
 
 class TestScenarioCreate(TestScenarioBase):
     def test_withoutBody(self):
         (code, body) = self.create()
-        self.assertEqual(code, HTTP_BAD_REQUEST)
+        self.assertEqual(code, constants.HTTP_BAD_REQUEST)
 
     def test_emptyName(self):
-        req_empty = ScenarioCreateRequest('')
+        req_empty = models.ScenarioCreateRequest('')
         (code, body) = self.create(req_empty)
-        self.assertEqual(code, HTTP_BAD_REQUEST)
+        self.assertEqual(code, constants.HTTP_BAD_REQUEST)
         self.assertIn('name missing', body)
 
     def test_noneName(self):
-        req_none = ScenarioCreateRequest(None)
+        req_none = models.ScenarioCreateRequest(None)
         (code, body) = self.create(req_none)
-        self.assertEqual(code, HTTP_BAD_REQUEST)
+        self.assertEqual(code, constants.HTTP_BAD_REQUEST)
         self.assertIn('name missing', body)
 
     def test_success(self):
         (code, body) = self.create_d()
-        self.assertEqual(code, HTTP_OK)
+        self.assertEqual(code, constants.HTTP_OK)
         self.assert_create_body(body)
 
     def test_alreadyExist(self):
         self.create_d()
         (code, body) = self.create_d()
-        self.assertEqual(code, HTTP_FORBIDDEN)
+        self.assertEqual(code, constants.HTTP_FORBIDDEN)
         self.assertIn('already exists', body)
 
 
@@ -83,8 +93,7 @@ class TestScenarioGet(TestScenarioBase):
         self.scenario_2 = self.create_return_name(self.req_2)
 
     def test_getByName(self):
-        code, body = self.get(self.scenario_1)
-        self.assert_res(code, body, req=self.req_d)
+        self._get_and_assert(self.scenario_1, self.req_d)
 
     def test_getAll(self):
         self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
@@ -113,17 +122,10 @@ class TestScenarioGet(TestScenarioBase):
 
         self._query_and_assert(query, reqs=[self.req_d])
 
-    @staticmethod
-    def _set_query(*args):
-        uri = ''
-        for arg in args:
-            uri += arg + '&'
-        return uri[0: -1]
-
     def _query_and_assert(self, query, found=True, reqs=None):
         code, body = self.query(query)
         if not found:
-            self.assertEqual(code, HTTP_OK)
+            self.assertEqual(code, constants.HTTP_OK)
             self.assertEqual(0, len(body.scenarios))
         else:
             self.assertEqual(len(reqs), len(body.scenarios))
@@ -131,3 +133,227 @@ class TestScenarioGet(TestScenarioBase):
                 for scenario in body.scenarios:
                     if req['name'] == scenario.name:
                         self.assert_res(code, scenario, req)
+
+
+class TestScenarioUpdate(TestScenarioBase):
+    def setUp(self):
+        super(TestScenarioUpdate, self).setUp()
+        self.scenario = self.create_return_name(self.req_d)
+        self.scenario_2 = self.create_return_name(self.req_2)
+
+    def _execute(set_update):
+        @functools.wraps(set_update)
+        def magic(self):
+            update, scenario = set_update(self, deepcopy(self.req_d))
+            self._update_and_assert(update, scenario)
+        return magic
+
+    def _update(expected):
+        def _update(set_update):
+            @functools.wraps(set_update)
+            def wrap(self):
+                update, scenario = set_update(self, deepcopy(self.req_d))
+                code, body = self.update(update, self.scenario)
+                getattr(self, expected)(code, scenario)
+            return wrap
+        return _update
+
+    @_update('_success')
+    def test_renameScenario(self, scenario):
+        new_name = 'nosdn-nofeature-noha'
+        scenario['name'] = new_name
+        update_req = models.ScenarioUpdateRequest(field='name',
+                                                  op='update',
+                                                  locate={},
+                                                  term={'name': new_name})
+        return update_req, scenario
+
+    @_update('_forbidden')
+    def test_renameScenario_exist(self, scenario):
+        new_name = self.scenario_2
+        scenario['name'] = new_name
+        update_req = models.ScenarioUpdateRequest(field='name',
+                                                  op='update',
+                                                  locate={},
+                                                  term={'name': new_name})
+        return update_req, scenario
+
+    @_update('_bad_request')
+    def test_renameScenario_noName(self, scenario):
+        new_name = self.scenario_2
+        scenario['name'] = new_name
+        update_req = models.ScenarioUpdateRequest(field='name',
+                                                  op='update',
+                                                  locate={},
+                                                  term={})
+        return update_req, scenario
+
+    @_execute
+    def test_addInstaller(self, scenario):
+        add = models.ScenarioInstaller(installer='daisy', versions=list())
+        scenario['installers'].append(add.format())
+        update = models.ScenarioUpdateRequest(field='installer',
+                                              op='add',
+                                              locate={},
+                                              term=add.format())
+        return update, scenario
+
+    @_execute
+    def test_deleteInstaller(self, scenario):
+        scenario['installers'] = filter(lambda f: f['installer'] != 'apex',
+                                        scenario['installers'])
+
+        update = models.ScenarioUpdateRequest(field='installer',
+                                              op='delete',
+                                              locate={'installer': 'apex'})
+        return update, scenario
+
+    @_execute
+    def test_addVersion(self, scenario):
+        add = models.ScenarioVersion(version='danube', projects=list())
+        scenario['installers'][0]['versions'].append(add.format())
+        update = models.ScenarioUpdateRequest(field='version',
+                                              op='add',
+                                              locate={'installer': 'apex'},
+                                              term=add.format())
+        return update, scenario
+
+    @_execute
+    def test_deleteVersion(self, scenario):
+        scenario['installers'][0]['versions'] = filter(
+            lambda f: f['version'] != 'master',
+            scenario['installers'][0]['versions'])
+
+        update = models.ScenarioUpdateRequest(field='version',
+                                              op='delete',
+                                              locate={'installer': 'apex',
+                                                      'version': 'master'})
+        return update, scenario
+
+    @_execute
+    def test_changeOwner(self, scenario):
+        scenario['installers'][0]['versions'][0]['owner'] = 'lucy'
+
+        update = models.ScenarioUpdateRequest(field='owner',
+                                              op='update',
+                                              locate={'installer': 'apex',
+                                                      'version': 'master'},
+                                              term={'owner': 'lucy'})
+        return update, scenario
+
+    @_execute
+    def test_addProject(self, scenario):
+        add = models.ScenarioProject(project='qtip').format()
+        scenario['installers'][0]['versions'][0]['projects'].append(add)
+        update = models.ScenarioUpdateRequest(field='project',
+                                              op='add',
+                                              locate={'installer': 'apex',
+                                                      'version': 'master'},
+                                              term=add)
+        return update, scenario
+
+    @_execute
+    def test_deleteProject(self, scenario):
+        scenario['installers'][0]['versions'][0]['projects'] = filter(
+            lambda f: f['project'] != 'functest',
+            scenario['installers'][0]['versions'][0]['projects'])
+
+        update = models.ScenarioUpdateRequest(field='project',
+                                              op='delete',
+                                              locate={
+                                                  'installer': 'apex',
+                                                  'version': 'master',
+                                                  'project': 'functest'})
+        return update, scenario
+
+    @_execute
+    def test_addCustoms(self, scenario):
+        add = ['odl', 'parser', 'vping_ssh']
+        projects = scenario['installers'][0]['versions'][0]['projects']
+        functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+        functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh']
+        update = models.ScenarioUpdateRequest(field='customs',
+                                              op='add',
+                                              locate={
+                                                  'installer': 'apex',
+                                                  'version': 'master',
+                                                  'project': 'functest'},
+                                              term=add)
+        return update, scenario
+
+    @_execute
+    def test_deleteCustoms(self, scenario):
+        projects = scenario['installers'][0]['versions'][0]['projects']
+        functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+        functest['customs'] = ['healthcheck']
+        update = models.ScenarioUpdateRequest(field='customs',
+                                              op='delete',
+                                              locate={
+                                                  'installer': 'apex',
+                                                  'version': 'master',
+                                                  'project': 'functest'},
+                                              term=['vping_ssh'])
+        return update, scenario
+
+    @_execute
+    def test_addScore(self, scenario):
+        add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
+        projects = scenario['installers'][0]['versions'][0]['projects']
+        functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+        functest['scores'].append(add.format())
+        update = models.ScenarioUpdateRequest(field='score',
+                                              op='add',
+                                              locate={
+                                                  'installer': 'apex',
+                                                  'version': 'master',
+                                                  'project': 'functest'},
+                                              term=add.format())
+        return update, scenario
+
+    @_execute
+    def test_addTi(self, scenario):
+        add = models.ScenarioTI(date=str(datetime.now()), status='gold')
+        projects = scenario['installers'][0]['versions'][0]['projects']
+        functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+        functest['trust_indicators'].append(add.format())
+        update = models.ScenarioUpdateRequest(field='trust_indicator',
+                                              op='add',
+                                              locate={
+                                                  'installer': 'apex',
+                                                  'version': 'master',
+                                                  'project': 'functest'},
+                                              term=add.format())
+        return update, scenario
+
+    def _update_and_assert(self, update_req, new_scenario, name=None):
+        code, _ = self.update(update_req, self.scenario)
+        self.assertEqual(code, constants.HTTP_OK)
+        self._get_and_assert(_none_default(name, self.scenario),
+                             new_scenario)
+
+    def _success(self, status, new_scenario):
+        self.assertEqual(status, constants.HTTP_OK)
+        self._get_and_assert(new_scenario.get('name'), new_scenario)
+
+    def _forbidden(self, status, new_scenario):
+        self.assertEqual(status, constants.HTTP_FORBIDDEN)
+
+    def _bad_request(self, status, new_scenario):
+        self.assertEqual(status, constants.HTTP_BAD_REQUEST)
+
+
+class TestScenarioDelete(TestScenarioBase):
+    def test_notFound(self):
+        code, body = self.delete('notFound')
+        self.assertEqual(code, constants.HTTP_NOT_FOUND)
+
+    def test_success(self):
+        scenario = self.create_return_name(self.req_d)
+        code, _ = self.delete(scenario)
+        self.assertEqual(code, constants.HTTP_OK)
+        code, _ = self.get(scenario)
+        self.assertEqual(code, constants.HTTP_NOT_FOUND)
+
+
+def _none_default(check, default):
+    return check if check else default