X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=utils%2Ftest%2Ftestapi%2Fopnfv_testapi%2Fcommon%2Fcheck.py;h=fd30c9b3f4db245c5772f5672ddfe3ef9741c4df;hb=410025769a5f59469722704cdb1d53bfe1d20ba0;hp=67e8fbd409ab9005d30404f825582429c48a27b1;hpb=1261ce7747b463a0f70d228567be4829d43d148f;p=releng.git diff --git a/utils/test/testapi/opnfv_testapi/common/check.py b/utils/test/testapi/opnfv_testapi/common/check.py index 67e8fbd40..fd30c9b3f 100644 --- a/utils/test/testapi/opnfv_testapi/common/check.py +++ b/utils/test/testapi/opnfv_testapi/common/check.py @@ -7,26 +7,43 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## import functools +import re from tornado import gen -from tornado import web +from opnfv_testapi.common import constants from opnfv_testapi.common import message from opnfv_testapi.common import raises +from opnfv_testapi.common.config import CONF +from opnfv_testapi.db import api as dbapi -def authenticate(method): - @web.asynchronous - @gen.coroutine +def is_authorized(method): @functools.wraps(method) def wrapper(self, *args, **kwargs): - if self.auth: + if CONF.api_authenticate and self.table in ['pods']: + testapi_id = self.get_secure_cookie(constants.TESTAPI_ID) + if not testapi_id: + raises.Unauthorized(message.not_login()) + user_info = yield dbapi.db_find_one('users', {'user': testapi_id}) + if not user_info: + raises.Unauthorized(message.not_lfid()) + kwargs['owner'] = testapi_id + ret = yield gen.coroutine(method)(self, *args, **kwargs) + raise gen.Return(ret) + return wrapper + + +def valid_token(method): + @functools.wraps(method) + def wrapper(self, *args, **kwargs): + if self.auth and self.table == 'results': try: token = self.request.headers['X-Auth-Token'] except KeyError: raises.Unauthorized(message.unauthorized()) query = {'access_token': token} - check = yield self._eval_db_find_one(query, 'tokens') + check = yield dbapi.db_find_one('tokens', query) if not check: raises.Forbidden(message.invalid_token()) ret = yield gen.coroutine(method)(self, *args, **kwargs) @@ -38,7 +55,7 @@ def not_exist(xstep): @functools.wraps(xstep) def wrap(self, *args, **kwargs): query = kwargs.get('query') - data = yield self._eval_db_find_one(query) + data = yield dbapi.db_find_one(self.table, query) if not data: raises.NotFound(message.not_found(self.table, query)) ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs) @@ -78,7 +95,7 @@ def carriers_exist(xstep): carriers = kwargs.pop('carriers', {}) if carriers: for table, query in carriers: - exist = yield self._eval_db_find_one(query(), table) + exist = yield dbapi.db_find_one(table, query()) if not exist: raises.Forbidden(message.not_found(table, query())) ret = yield gen.coroutine(xstep)(self, *args, **kwargs) @@ -91,7 +108,12 @@ def new_not_exists(xstep): def wrap(self, *args, **kwargs): query = kwargs.get('query') if query: - to_data = yield self._eval_db_find_one(query()) + query_data = query() + if self.table == 'pods': + if query_data.get('name') is not None: + query_data['name'] = re.compile(query_data.get('name'), + re.IGNORECASE) + to_data = yield dbapi.db_find_one(self.table, query_data) if to_data: raises.Forbidden(message.exist(self.table, query())) ret = yield gen.coroutine(xstep)(self, *args, **kwargs) @@ -105,7 +127,7 @@ def updated_one_not_exist(xstep): db_keys = kwargs.pop('db_keys', []) query = self._update_query(db_keys, data) if query: - to_data = yield self._eval_db_find_one(query) + to_data = yield dbapi.db_find_one(self.table, query) if to_data: raises.Forbidden(message.exist(self.table, query)) ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)