##############################################################################
from datetime import datetime
+import functools
import json
from tornado import gen
self.db_testcases = 'testcases'
self.db_results = 'results'
self.db_scenarios = 'scenarios'
+ self.auth = self.settings["auth"]
def prepare(self):
if self.request.method != "GET" and self.request.method != "DELETE":
cls_data = self.table_cls.from_dict(data)
return cls_data.format_http()
- @web.asynchronous
- @gen.coroutine
+ def authenticate(method):
+ @web.asynchronous
+ @gen.coroutine
+ @functools.wraps(method)
+ def wrapper(self, *args, **kwargs):
+ if self.auth:
+ try:
+ token = self.request.headers['X-Auth-Token']
+ except KeyError:
+ raise web.HTTPError(constants.HTTP_UNAUTHORIZED,
+ "No Authentication Header.")
+ query = {'access_token': token}
+ check = yield self._eval_db_find_one(query, 'tokens')
+ if not check:
+ raise web.HTTPError(constants.HTTP_FORBIDDEN,
+ "Invalid Token.")
+ ret = yield gen.coroutine(method)(self, *args, **kwargs)
+ raise gen.Return(ret)
+ return wrapper
+
+ @authenticate
def _create(self, miss_checks, db_checks, **kwargs):
"""
:param miss_checks: [miss1, miss2]
.format(query, self.table))
self.finish_request(self.format_data(data))
- @web.asynchronous
- @gen.coroutine
+ @authenticate
def _delete(self, query):
data = yield self._eval_db_find_one(query)
if data is None:
yield self._eval_db(self.table, 'remove', query)
self.finish_request()
- @web.asynchronous
- @gen.coroutine
+ @authenticate
def _update(self, query, db_keys):
if self.json_args is None:
raise web.HTTPError(constants.HTTP_BAD_REQUEST, "No payload")