# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
import functools
+import re
-import cas
from tornado import gen
-from tornado import web
from opnfv_testapi.common import constants
from opnfv_testapi.common import message
from opnfv_testapi.common import raises
-from opnfv_testapi.common.config import CONF
from opnfv_testapi.db import api as dbapi
-def login(method):
- @web.asynchronous
- @gen.coroutine
+def is_authorized(method):
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
- ticket = self.get_query_argument('ticket', default=None)
- if ticket:
- client = cas.CASClient(version='2',
- server_url=CONF.lfid_cas_url,
- service_url=CONF.ui_url)
- (user, attrs, _) = client.verify_ticket(ticket=ticket)
- print 'login user: {}'.format(user)
- login_user = {
- 'user': user,
- 'email': attrs.get('mail'),
- 'fullname': attrs.get('field_lf_full_name'),
- 'groups': constants.TESTAPI_USERS + attrs.get('group', [])
- }
- q_user = {'user': user}
- db_user = yield dbapi.db_find_one(constants.USER_TABLE, q_user)
- if not db_user:
- dbapi.db_save(constants.USER_TABLE, login_user)
- else:
- dbapi.db_update(constants.USER_TABLE, q_user, login_user)
-
- self.clear_cookie(constants.TESTAPI_ID)
- self.set_secure_cookie(constants.TESTAPI_ID, user)
+ if self.table in ['pods']:
+ testapi_id = self.get_secure_cookie(constants.TESTAPI_ID)
+ if not testapi_id:
+ raises.Unauthorized(message.not_login())
+ user_info = yield dbapi.db_find_one('users', {'user': testapi_id})
+ if not user_info:
+ raises.Unauthorized(message.not_lfid())
+ kwargs['owner'] = testapi_id
ret = yield gen.coroutine(method)(self, *args, **kwargs)
raise gen.Return(ret)
return wrapper
-def authenticate(method):
- @web.asynchronous
- @gen.coroutine
+def valid_token(method):
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
- if self.auth:
+ if self.auth and self.table == 'results':
try:
token = self.request.headers['X-Auth-Token']
except KeyError:
def wrap(self, *args, **kwargs):
query = kwargs.get('query')
if query:
- to_data = yield dbapi.db_find_one(self.table, query())
+ query_data = query()
+ if self.table == 'pods':
+ if query_data.get('name') is not None:
+ query_data['name'] = re.compile(query_data.get('name'),
+ re.IGNORECASE)
+ to_data = yield dbapi.db_find_one(self.table, query_data)
if to_data:
raises.Forbidden(message.exist(self.table, query()))
ret = yield gen.coroutine(xstep)(self, *args, **kwargs)