X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=utils%2Ftest%2Ftestapi%2Fopnfv_testapi%2Fcommon%2Fcheck.py;h=e80b1c6b78508dc0acb241aef6927540f16e838d;hb=cf7a4aa78910d8c401d018a769863097e63b7f4a;hp=be4b1df12210b17506cba157f12560efc589dc9a;hpb=82fc0ad06c8195e1c0f8b7d6276321fe5ef2daea;p=releng.git diff --git a/utils/test/testapi/opnfv_testapi/common/check.py b/utils/test/testapi/opnfv_testapi/common/check.py index be4b1df12..e80b1c6b7 100644 --- a/utils/test/testapi/opnfv_testapi/common/check.py +++ b/utils/test/testapi/opnfv_testapi/common/check.py @@ -7,24 +7,42 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## import functools +import re -from tornado import web, gen +from tornado import gen -from opnfv_testapi.common import raises, message +from opnfv_testapi.common import constants +from opnfv_testapi.common import message +from opnfv_testapi.common import raises +from opnfv_testapi.db import api as dbapi -def authenticate(method): - @web.asynchronous - @gen.coroutine +def is_authorized(method): @functools.wraps(method) def wrapper(self, *args, **kwargs): - if self.auth: + if self.table in ['pods']: + testapi_id = self.get_secure_cookie(constants.TESTAPI_ID) + if not testapi_id: + raises.Unauthorized(message.not_login()) + user_info = yield dbapi.db_find_one('users', {'user': testapi_id}) + if not user_info: + raises.Unauthorized(message.not_lfid()) + kwargs['owner'] = testapi_id + ret = yield gen.coroutine(method)(self, *args, **kwargs) + raise gen.Return(ret) + return wrapper + + +def valid_token(method): + @functools.wraps(method) + def wrapper(self, *args, **kwargs): + if self.auth and self.table == 'results': try: token = self.request.headers['X-Auth-Token'] except KeyError: raises.Unauthorized(message.unauthorized()) query = {'access_token': token} - check = yield self._eval_db_find_one(query, 'tokens') + check = yield dbapi.db_find_one('tokens', query) if not check: raises.Forbidden(message.invalid_token()) ret = yield gen.coroutine(method)(self, *args, **kwargs) @@ -36,7 +54,7 @@ def not_exist(xstep): @functools.wraps(xstep) def wrap(self, *args, **kwargs): query = kwargs.get('query') - data = yield self._eval_db_find_one(query) + data = yield dbapi.db_find_one(self.table, query) if not data: raises.NotFound(message.not_found(self.table, query)) ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs) @@ -59,7 +77,7 @@ def no_body(xstep): def miss_fields(xstep): @functools.wraps(xstep) def wrap(self, *args, **kwargs): - fields = kwargs.get('miss_fields') + fields = kwargs.pop('miss_fields', []) if fields: for miss in fields: miss_data = self.json_args.get(miss) @@ -73,10 +91,10 @@ def miss_fields(xstep): def carriers_exist(xstep): @functools.wraps(xstep) def wrap(self, *args, **kwargs): - carriers = kwargs.get('carriers') + carriers = kwargs.pop('carriers', {}) if carriers: for table, query in carriers: - exist = yield self._eval_db_find_one(query(), table) + exist = yield dbapi.db_find_one(table, query()) if not exist: raises.Forbidden(message.not_found(table, query())) ret = yield gen.coroutine(xstep)(self, *args, **kwargs) @@ -89,7 +107,12 @@ def new_not_exists(xstep): def wrap(self, *args, **kwargs): query = kwargs.get('query') if query: - to_data = yield self._eval_db_find_one(query()) + query_data = query() + if self.table == 'pods': + if query_data.get('name') is not None: + query_data['name'] = re.compile(query_data.get('name'), + re.IGNORECASE) + to_data = yield dbapi.db_find_one(self.table, query_data) if to_data: raises.Forbidden(message.exist(self.table, query())) ret = yield gen.coroutine(xstep)(self, *args, **kwargs) @@ -100,10 +123,10 @@ def new_not_exists(xstep): def updated_one_not_exist(xstep): @functools.wraps(xstep) def wrap(self, data, *args, **kwargs): - db_keys = kwargs.get('db_keys') + db_keys = kwargs.pop('db_keys', []) query = self._update_query(db_keys, data) if query: - to_data = yield self._eval_db_find_one(query) + to_data = yield dbapi.db_find_one(self.table, query) if to_data: raises.Forbidden(message.exist(self.table, query)) ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)