update scenario and related unittest 53/28253/8
authorSerenaFeng <feng.xiaowei@zte.com.cn>
Wed, 8 Feb 2017 02:15:11 +0000 (10:15 +0800)
committerSerenaFeng <feng.xiaowei@zte.com.cn>
Thu, 9 Feb 2017 08:31:00 +0000 (16:31 +0800)
JIRA: RELENG-163

Change-Id: I137898dc84de5f8cb6cab5de9b4ac5de5e4bb978
Signed-off-by: SerenaFeng <feng.xiaowei@zte.com.cn>
utils/test/testapi/opnfv_testapi/resources/handlers.py
utils/test/testapi/opnfv_testapi/resources/scenario_handlers.py
utils/test/testapi/opnfv_testapi/resources/scenario_models.py
utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py

index 5f6c3df..a2628e2 100644 (file)
@@ -172,8 +172,7 @@ class GenericApiHandler(RequestHandler):
                                 .format(new_query, self.table))
 
         # we merge the whole document """
-        edit_request = data.format()
-        edit_request.update(self._update_requests(data))
+        edit_request = self._update_requests(data)
 
         """ Updating the DB """
         yield self._eval_db(self.table, 'update', query, edit_request,
@@ -188,7 +187,10 @@ class GenericApiHandler(RequestHandler):
                                            data.__getattribute__(k))
         if not request:
             raise HTTPError(HTTP_FORBIDDEN, "Nothing to update")
-        return request
+
+        edit_request = data.format()
+        edit_request.update(request)
+        return edit_request
 
     @staticmethod
     def _update_request(edit_request, key, new_value, old_value):
index a9b89eb..a8c1a94 100644 (file)
@@ -1,6 +1,7 @@
 from opnfv_testapi.common.constants import HTTP_FORBIDDEN
 from opnfv_testapi.resources.handlers import GenericApiHandler
 from opnfv_testapi.resources.scenario_models import Scenario
+import opnfv_testapi.resources.scenario_models as models
 from opnfv_testapi.tornado_swagger import swagger
 
 
@@ -104,11 +105,169 @@ class ScenarioGURHandler(GenericScenarioHandler):
         """
             @description: update a single scenario by name
             @param body: fields to be updated
-            @type body: L{ScenarioCreateRequest}
+            @type body: L{ScenarioUpdateRequest}
             @in body: body
             @rtype: L{Scenario}
             @return 200: update success
             @raise 404: scenario not exist
             @raise 403: nothing to update
         """
-        pass
+        query = {'name': name}
+        db_keys = ['name']
+        self._update(query, db_keys)
+
+    def _update_query(self, keys, data):
+        query = dict()
+        equal = True
+        if self._is_rename():
+            new = self._term.get('name')
+            if data.name != new:
+                equal = False
+                query['name'] = new
+
+        return equal, query
+
+    def _update_requests(self, data):
+        updates = {
+            ('name', 'update'): self._update_requests_rename,
+            ('installer', 'add'): self._update_requests_add_installer,
+            ('installer', 'delete'): self._update_requests_delete_installer,
+            ('version', 'add'): self._update_requests_add_version,
+            ('version', 'delete'): self._update_requests_delete_version,
+            ('owner', 'update'): self._update_requests_change_owner,
+            ('project', 'add'): self._update_requests_add_project,
+            ('project', 'delete'): self._update_requests_delete_project,
+            ('customs', 'add'): self._update_requests_add_customs,
+            ('customs', 'delete'): self._update_requests_delete_customs,
+            ('score', 'add'): self._update_requests_add_score,
+            ('trust_indicator', 'add'): self._update_requests_add_ti,
+        }
+
+        updates[(self._field, self._op)](data)
+
+        return data.format()
+
+    def _iter_installers(xstep):
+        def magic(self, data):
+            [xstep(self, installer)
+             for installer in self._filter_installers(data.installers)]
+        return magic
+
+    def _iter_versions(xstep):
+        def magic(self, installer):
+            [xstep(self, version)
+             for version in (self._filter_versions(installer.versions))]
+        return magic
+
+    def _iter_projects(xstep):
+        def magic(self, version):
+            [xstep(self, project)
+             for project in (self._filter_projects(version.projects))]
+        return magic
+
+    def _update_requests_rename(self, data):
+        data.name = self._term.get('name')
+
+    def _update_requests_add_installer(self, data):
+        data.installers.append(models.ScenarioInstaller.from_dict(self._term))
+
+    def _update_requests_delete_installer(self, data):
+        data.installers = self._remove_installers(data.installers)
+
+    @_iter_installers
+    def _update_requests_add_version(self, installer):
+        installer.versions.append(models.ScenarioVersion.from_dict(self._term))
+
+    @_iter_installers
+    def _update_requests_delete_version(self, installer):
+        installer.versions = self._remove_versions(installer.versions)
+
+    @_iter_installers
+    @_iter_versions
+    def _update_requests_change_owner(self, version):
+        version.owner = self._term.get('owner')
+
+    @_iter_installers
+    @_iter_versions
+    def _update_requests_add_project(self, version):
+        version.projects.append(models.ScenarioProject.from_dict(self._term))
+
+    @_iter_installers
+    @_iter_versions
+    def _update_requests_delete_project(self, version):
+        version.projects = self._remove_projects(version.projects)
+
+    @_iter_installers
+    @_iter_versions
+    @_iter_projects
+    def _update_requests_add_customs(self, project):
+        project.customs = list(set(project.customs + self._term))
+
+    @_iter_installers
+    @_iter_versions
+    @_iter_projects
+    def _update_requests_delete_customs(self, project):
+        project.customs = filter(
+            lambda f: f not in self._term,
+            project.customs)
+
+    @_iter_installers
+    @_iter_versions
+    @_iter_projects
+    def _update_requests_add_score(self, project):
+        project.scores.append(
+            models.ScenarioScore.from_dict(self._term))
+
+    @_iter_installers
+    @_iter_versions
+    @_iter_projects
+    def _update_requests_add_ti(self, project):
+        project.trust_indicators.append(
+            models.ScenarioTI.from_dict(self._term))
+
+    def _is_rename(self):
+        return self._field == 'name' and self._op == 'update'
+
+    def _remove_installers(self, installers):
+        return self._remove('installer', installers)
+
+    def _filter_installers(self, installers):
+        return self._filter('installer', installers)
+
+    def _remove_versions(self, versions):
+        return self._remove('version', versions)
+
+    def _filter_versions(self, versions):
+        return self._filter('version', versions)
+
+    def _remove_projects(self, projects):
+        return self._remove('project', projects)
+
+    def _filter_projects(self, projects):
+        return self._filter('project', projects)
+
+    def _remove(self, field, fields):
+        return filter(
+            lambda f: getattr(f, field) != self._locate.get(field),
+            fields)
+
+    def _filter(self, field, fields):
+        return filter(
+            lambda f: getattr(f, field) == self._locate.get(field),
+            fields)
+
+    @property
+    def _field(self):
+        return self.json_args.get('field')
+
+    @property
+    def _op(self):
+        return self.json_args.get('op')
+
+    @property
+    def _locate(self):
+        return self.json_args.get('locate')
+
+    @property
+    def _term(self):
+        return self.json_args.get('term')
index f89a124..73bcbe9 100644 (file)
@@ -2,6 +2,14 @@ import models
 from opnfv_testapi.tornado_swagger import swagger
 
 
+def list_default(value):
+    return value if value else list()
+
+
+def dict_default(value):
+    return value if value else dict()
+
+
 @swagger.model()
 class ScenarioTI(models.ModelBase):
     def __init__(self, date=None, status='silver'):
@@ -32,9 +40,9 @@ class ScenarioProject(models.ModelBase):
                  scores=None,
                  trust_indicators=None):
         self.project = project
-        self.customs = customs
-        self.scores = scores
-        self.trust_indicators = trust_indicators
+        self.customs = list_default(customs)
+        self.scores = list_default(scores)
+        self.trust_indicators = list_default(trust_indicators)
 
     @staticmethod
     def attr_parser():
@@ -50,7 +58,7 @@ class ScenarioVersion(models.ModelBase):
     """
     def __init__(self, version=None, projects=None):
         self.version = version
-        self.projects = projects
+        self.projects = list_default(projects)
 
     @staticmethod
     def attr_parser():
@@ -65,7 +73,7 @@ class ScenarioInstaller(models.ModelBase):
     """
     def __init__(self, installer=None, versions=None):
         self.installer = installer
-        self.versions = versions if versions else list()
+        self.versions = list_default(versions)
 
     @staticmethod
     def attr_parser():
@@ -80,13 +88,28 @@ class ScenarioCreateRequest(models.ModelBase):
     """
     def __init__(self, name='', installers=None):
         self.name = name
-        self.installers = installers if installers else list()
+        self.installers = list_default(installers)
 
     @staticmethod
     def attr_parser():
         return {'installers': ScenarioInstaller}
 
 
+@swagger.model()
+class ScenarioUpdateRequest(models.ModelBase):
+    """
+        @property field: update field
+        @property op: add/delete/update
+        @property locate: information used to locate the field
+        @property term: new value
+    """
+    def __init__(self, field=None, op=None, locate=None, term=None):
+        self.field = field
+        self.op = op
+        self.locate = dict_default(locate)
+        self.term = dict_default(term)
+
+
 @swagger.model()
 class Scenario(models.ModelBase):
     """
@@ -97,7 +120,7 @@ class Scenario(models.ModelBase):
         self.name = name
         self._id = _id
         self.creation_date = create_date
-        self.installers = installers if installers else list()
+        self.installers = list_default(installers)
 
     @staticmethod
     def attr_parser():
index ff59795..c15dc32 100644 (file)
@@ -1,20 +1,20 @@
+from copy import deepcopy
 import json
 import os
+from datetime import datetime
 
 from opnfv_testapi.common.constants import HTTP_BAD_REQUEST
 from opnfv_testapi.common.constants import HTTP_FORBIDDEN
 from opnfv_testapi.common.constants import HTTP_OK
-from opnfv_testapi.resources.scenario_models import Scenario
-from opnfv_testapi.resources.scenario_models import ScenarioCreateRequest
-from opnfv_testapi.resources.scenario_models import Scenarios
+import opnfv_testapi.resources.scenario_models as models
 from test_testcase import TestBase
 
 
 class TestScenarioBase(TestBase):
     def setUp(self):
         super(TestScenarioBase, self).setUp()
-        self.get_res = Scenario
-        self.list_res = Scenarios
+        self.get_res = models.Scenario
+        self.list_res = models.Scenarios
         self.basePath = '/api/v1/scenarios'
         self.req_d = self._load_request('scenario-c1.json')
         self.req_2 = self._load_request('scenario-c2.json')
@@ -46,6 +46,17 @@ class TestScenarioBase(TestBase):
         self.assertIsNotNone(scenario_dict['creation_date'])
         self.assertDictContainsSubset(req, scenario_dict)
 
+    @staticmethod
+    def _set_query(*args):
+        uri = ''
+        for arg in args:
+            uri += arg + '&'
+        return uri[0: -1]
+
+    def _get_and_assert(self, name, req=None):
+        code, body = self.get(name)
+        self.assert_res(code, body, req)
+
 
 class TestScenarioCreate(TestScenarioBase):
     def test_withoutBody(self):
@@ -53,13 +64,13 @@ class TestScenarioCreate(TestScenarioBase):
         self.assertEqual(code, HTTP_BAD_REQUEST)
 
     def test_emptyName(self):
-        req_empty = ScenarioCreateRequest('')
+        req_empty = models.ScenarioCreateRequest('')
         (code, body) = self.create(req_empty)
         self.assertEqual(code, HTTP_BAD_REQUEST)
         self.assertIn('name missing', body)
 
     def test_noneName(self):
-        req_none = ScenarioCreateRequest(None)
+        req_none = models.ScenarioCreateRequest(None)
         (code, body) = self.create(req_none)
         self.assertEqual(code, HTTP_BAD_REQUEST)
         self.assertIn('name missing', body)
@@ -83,8 +94,7 @@ class TestScenarioGet(TestScenarioBase):
         self.scenario_2 = self.create_return_name(self.req_2)
 
     def test_getByName(self):
-        code, body = self.get(self.scenario_1)
-        self.assert_res(code, body, req=self.req_d)
+        self._get_and_assert(self.scenario_1, self.req_d)
 
     def test_getAll(self):
         self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
@@ -113,13 +123,6 @@ class TestScenarioGet(TestScenarioBase):
 
         self._query_and_assert(query, reqs=[self.req_d])
 
-    @staticmethod
-    def _set_query(*args):
-        uri = ''
-        for arg in args:
-            uri += arg + '&'
-        return uri[0: -1]
-
     def _query_and_assert(self, query, found=True, reqs=None):
         code, body = self.query(query)
         if not found:
@@ -131,3 +134,172 @@ class TestScenarioGet(TestScenarioBase):
                 for scenario in body.scenarios:
                     if req['name'] == scenario.name:
                         self.assert_res(code, scenario, req)
+
+
+class TestScenarioUpdate(TestScenarioBase):
+    def setUp(self):
+        super(TestScenarioUpdate, self).setUp()
+        self.scenario = self.create_return_name(self.req_d)
+
+    def _execute(set_update):
+        def magic(self):
+            update, scenario = set_update(self, deepcopy(self.req_d))
+            self._update_and_assert(update, scenario)
+        return magic
+
+    def test_renameScenario(self):
+        new_name = 'nosdn-nofeature-noha'
+        new_scenario = deepcopy(self.req_d)
+        new_scenario['name'] = new_name
+        update_req = models.ScenarioUpdateRequest(field='name',
+                                                  op='update',
+                                                  locate={},
+                                                  term={'name': new_name})
+        self._update_and_assert(update_req, new_scenario, new_name)
+
+    @_execute
+    def test_addInstaller(self, scenario):
+        add = models.ScenarioInstaller(installer='daisy', versions=list())
+        scenario['installers'].append(add.format())
+        update = models.ScenarioUpdateRequest(field='installer',
+                                              op='add',
+                                              locate={},
+                                              term=add.format())
+        return update, scenario
+
+    @_execute
+    def test_deleteInstaller(self, scenario):
+        scenario['installers'] = filter(lambda f: f['installer'] != 'apex',
+                                        scenario['installers'])
+
+        update = models.ScenarioUpdateRequest(field='installer',
+                                              op='delete',
+                                              locate={'installer': 'apex'})
+        return update, scenario
+
+    @_execute
+    def test_addVersion(self, scenario):
+        add = models.ScenarioVersion(version='danube', projects=list())
+        scenario['installers'][0]['versions'].append(add.format())
+        update = models.ScenarioUpdateRequest(field='version',
+                                              op='add',
+                                              locate={'installer': 'apex'},
+                                              term=add.format())
+        return update, scenario
+
+    @_execute
+    def test_deleteVersion(self, scenario):
+        scenario['installers'][0]['versions'] = filter(
+            lambda f: f['version'] != 'master',
+            scenario['installers'][0]['versions'])
+
+        update = models.ScenarioUpdateRequest(field='version',
+                                              op='delete',
+                                              locate={'installer': 'apex',
+                                                      'version': 'master'})
+        return update, scenario
+
+    @_execute
+    def test_changeOwner(self, scenario):
+        scenario['installers'][0]['versions'][0]['owner'] = 'lucy'
+
+        update = models.ScenarioUpdateRequest(field='owner',
+                                              op='update',
+                                              locate={'installer': 'apex',
+                                                      'version': 'master'},
+                                              term={'owner': 'lucy'})
+        return update, scenario
+
+    @_execute
+    def test_addProject(self, scenario):
+        add = models.ScenarioProject(project='qtip').format()
+        scenario['installers'][0]['versions'][0]['projects'].append(add)
+        update = models.ScenarioUpdateRequest(field='project',
+                                              op='add',
+                                              locate={'installer': 'apex',
+                                                      'version': 'master'},
+                                              term=add)
+        return update, scenario
+
+    @_execute
+    def test_deleteProject(self, scenario):
+        scenario['installers'][0]['versions'][0]['projects'] = filter(
+            lambda f: f['project'] != 'functest',
+            scenario['installers'][0]['versions'][0]['projects'])
+
+        update = models.ScenarioUpdateRequest(field='project',
+                                              op='delete',
+                                              locate={
+                                                  'installer': 'apex',
+                                                  'version': 'master',
+                                                  'project': 'functest'})
+        return update, scenario
+
+    @_execute
+    def test_addCustoms(self, scenario):
+        add = ['odl', 'parser', 'vping_ssh']
+        projects = scenario['installers'][0]['versions'][0]['projects']
+        functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+        functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh']
+        update = models.ScenarioUpdateRequest(field='customs',
+                                              op='add',
+                                              locate={
+                                                  'installer': 'apex',
+                                                  'version': 'master',
+                                                  'project': 'functest'},
+                                              term=add)
+        return update, scenario
+
+    @_execute
+    def test_deleteCustoms(self, scenario):
+        projects = scenario['installers'][0]['versions'][0]['projects']
+        functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+        functest['customs'] = ['healthcheck']
+        update = models.ScenarioUpdateRequest(field='customs',
+                                              op='delete',
+                                              locate={
+                                                  'installer': 'apex',
+                                                  'version': 'master',
+                                                  'project': 'functest'},
+                                              term=['vping_ssh'])
+        return update, scenario
+
+    @_execute
+    def test_addScore(self, scenario):
+        add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
+        projects = scenario['installers'][0]['versions'][0]['projects']
+        functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+        functest['scores'].append(add.format())
+        update = models.ScenarioUpdateRequest(field='score',
+                                              op='add',
+                                              locate={
+                                                  'installer': 'apex',
+                                                  'version': 'master',
+                                                  'project': 'functest'},
+                                              term=add.format())
+        return update, scenario
+
+    @_execute
+    def test_addTi(self, scenario):
+        add = models.ScenarioTI(date=str(datetime.now()), status='gold')
+        projects = scenario['installers'][0]['versions'][0]['projects']
+        functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+        functest['trust_indicators'].append(add.format())
+        update = models.ScenarioUpdateRequest(field='trust_indicator',
+                                              op='add',
+                                              locate={
+                                                  'installer': 'apex',
+                                                  'version': 'master',
+                                                  'project': 'functest'},
+                                              term=add.format())
+        return update, scenario
+
+    def _update_and_assert(self, update_req, new_scenario, name=None):
+        code, _ = self.update(update_req, self.scenario)
+        self.assertEqual(code, HTTP_OK)
+        self._get_and_assert(self._none_default(name, self.scenario),
+                             new_scenario)
+
+    @staticmethod
+    def _none_default(check, default):
+        return check if check else default