Code Review
/
releng.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
leverage token_check only when posting results
[releng.git]
/
utils
/
test
/
testapi
/
opnfv_testapi
/
common
/
check.py
diff --git
a/utils/test/testapi/opnfv_testapi/common/check.py
b/utils/test/testapi/opnfv_testapi/common/check.py
index
4d9902c
..
9ded48d
100644
(file)
--- a/
utils/test/testapi/opnfv_testapi/common/check.py
+++ b/
utils/test/testapi/opnfv_testapi/common/check.py
@@
-7,26
+7,25
@@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
import functools
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
import functools
+import re
from tornado import gen
from tornado import gen
-from tornado import web
from opnfv_testapi.common import message
from opnfv_testapi.common import raises
from opnfv_testapi.common import message
from opnfv_testapi.common import raises
+from opnfv_testapi.db import api as dbapi
-def authenticate(method):
- @web.asynchronous
- @gen.coroutine
+def valid_token(method):
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
- if self.auth:
+ if self.auth
and self.table == 'results'
:
try:
token = self.request.headers['X-Auth-Token']
except KeyError:
raises.Unauthorized(message.unauthorized())
query = {'access_token': token}
try:
token = self.request.headers['X-Auth-Token']
except KeyError:
raises.Unauthorized(message.unauthorized())
query = {'access_token': token}
- check = yield
self._eval_db_find_one(query, 'tokens'
)
+ check = yield
dbapi.db_find_one('tokens', query
)
if not check:
raises.Forbidden(message.invalid_token())
ret = yield gen.coroutine(method)(self, *args, **kwargs)
if not check:
raises.Forbidden(message.invalid_token())
ret = yield gen.coroutine(method)(self, *args, **kwargs)
@@
-38,7
+37,7
@@
def not_exist(xstep):
@functools.wraps(xstep)
def wrap(self, *args, **kwargs):
query = kwargs.get('query')
@functools.wraps(xstep)
def wrap(self, *args, **kwargs):
query = kwargs.get('query')
- data = yield
self._eval_db_find_one(
query)
+ data = yield
dbapi.db_find_one(self.table,
query)
if not data:
raises.NotFound(message.not_found(self.table, query))
ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
if not data:
raises.NotFound(message.not_found(self.table, query))
ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
@@
-61,7
+60,7
@@
def no_body(xstep):
def miss_fields(xstep):
@functools.wraps(xstep)
def wrap(self, *args, **kwargs):
def miss_fields(xstep):
@functools.wraps(xstep)
def wrap(self, *args, **kwargs):
- fields = kwargs.
get('miss_fields'
)
+ fields = kwargs.
pop('miss_fields', []
)
if fields:
for miss in fields:
miss_data = self.json_args.get(miss)
if fields:
for miss in fields:
miss_data = self.json_args.get(miss)
@@
-75,10
+74,10
@@
def miss_fields(xstep):
def carriers_exist(xstep):
@functools.wraps(xstep)
def wrap(self, *args, **kwargs):
def carriers_exist(xstep):
@functools.wraps(xstep)
def wrap(self, *args, **kwargs):
- carriers = kwargs.
get('carriers'
)
+ carriers = kwargs.
pop('carriers', {}
)
if carriers:
for table, query in carriers:
if carriers:
for table, query in carriers:
- exist = yield
self._eval_db_find_one(query(), table
)
+ exist = yield
dbapi.db_find_one(table, query()
)
if not exist:
raises.Forbidden(message.not_found(table, query()))
ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
if not exist:
raises.Forbidden(message.not_found(table, query()))
ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
@@
-91,7
+90,12
@@
def new_not_exists(xstep):
def wrap(self, *args, **kwargs):
query = kwargs.get('query')
if query:
def wrap(self, *args, **kwargs):
query = kwargs.get('query')
if query:
- to_data = yield self._eval_db_find_one(query())
+ query_data = query()
+ if self.table == 'pods':
+ if query_data.get('name') is not None:
+ query_data['name'] = re.compile(query_data.get('name'),
+ re.IGNORECASE)
+ to_data = yield dbapi.db_find_one(self.table, query_data)
if to_data:
raises.Forbidden(message.exist(self.table, query()))
ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
if to_data:
raises.Forbidden(message.exist(self.table, query()))
ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
@@
-102,10
+106,10
@@
def new_not_exists(xstep):
def updated_one_not_exist(xstep):
@functools.wraps(xstep)
def wrap(self, data, *args, **kwargs):
def updated_one_not_exist(xstep):
@functools.wraps(xstep)
def wrap(self, data, *args, **kwargs):
- db_keys = kwargs.
get('db_keys'
)
+ db_keys = kwargs.
pop('db_keys', []
)
query = self._update_query(db_keys, data)
if query:
query = self._update_query(db_keys, data)
if query:
- to_data = yield
self._eval_db_find_one(
query)
+ to_data = yield
dbapi.db_find_one(self.table,
query)
if to_data:
raises.Forbidden(message.exist(self.table, query))
ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
if to_data:
raises.Forbidden(message.exist(self.table, query))
ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)