##############################################################################
from datetime import datetime
-import functools
import json
from tornado import gen
from tornado import web
-import models
-from opnfv_testapi.common import constants
+from opnfv_testapi.common import check
+from opnfv_testapi.common import message
+from opnfv_testapi.common import raises
+from opnfv_testapi.resources import models
from opnfv_testapi.tornado_swagger import swagger
+DEFAULT_REPRESENTATION = "application/json"
+
class GenericApiHandler(web.RequestHandler):
def __init__(self, application, request, **kwargs):
if self.request.method != "GET" and self.request.method != "DELETE":
if self.request.headers.get("Content-Type") is not None:
if self.request.headers["Content-Type"].startswith(
- constants.DEFAULT_REPRESENTATION):
+ DEFAULT_REPRESENTATION):
try:
self.json_args = json.loads(self.request.body)
except (ValueError, KeyError, TypeError) as error:
- raise web.HTTPError(constants.HTTP_BAD_REQUEST,
- "Bad Json format [{}]".
- format(error))
+ raises.BadRequest(message.bad_format(str(error)))
def finish_request(self, json_object=None):
if json_object:
self.write(json.dumps(json_object))
- self.set_header("Content-Type", constants.DEFAULT_REPRESENTATION)
+ self.set_header("Content-Type", DEFAULT_REPRESENTATION)
self.finish()
def _create_response(self, resource):
cls_data = self.table_cls.from_dict(data)
return cls_data.format_http()
- def authenticate(method):
- @web.asynchronous
- @gen.coroutine
- @functools.wraps(method)
- def wrapper(self, *args, **kwargs):
- if self.auth:
- try:
- token = self.request.headers['X-Auth-Token']
- except KeyError:
- raise web.HTTPError(constants.HTTP_UNAUTHORIZED,
- "No Authentication Header.")
- query = {'access_token': token}
- check = yield self._eval_db_find_one(query, 'tokens')
- if not check:
- raise web.HTTPError(constants.HTTP_FORBIDDEN,
- "Invalid Token.")
- ret = yield gen.coroutine(method)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrapper
-
- @authenticate
- def _create(self, miss_checks, db_checks, **kwargs):
+ @check.authenticate
+ @check.no_body
+ @check.miss_fields
+ @check.carriers_exist
+ @check.new_not_exists
+ def _create(self, **kwargs):
"""
:param miss_checks: [miss1, miss2]
:param db_checks: [(table, exist, query, error)]
"""
- if self.json_args is None:
- raise web.HTTPError(constants.HTTP_BAD_REQUEST, "no body")
-
data = self.table_cls.from_dict(self.json_args)
- for miss in miss_checks:
- miss_data = data.__getattribute__(miss)
- if miss_data is None or miss_data == '':
- raise web.HTTPError(constants.HTTP_BAD_REQUEST,
- '{} missing'.format(miss))
-
for k, v in kwargs.iteritems():
data.__setattr__(k, v)
- for table, exist, query, error in db_checks:
- check = yield self._eval_db_find_one(query(data), table)
- if (exist and not check) or (not exist and check):
- code, message = error(data)
- raise web.HTTPError(code, message)
-
if self.table != 'results':
data.creation_date = datetime.now()
_id = yield self._eval_db(self.table, 'insert', data.format(),
@web.asynchronous
@gen.coroutine
- def _get_one(self, query):
- data = yield self._eval_db_find_one(query)
- if data is None:
- raise web.HTTPError(constants.HTTP_NOT_FOUND,
- "[{}] not exist in table [{}]"
- .format(query, self.table))
+ @check.not_exist
+ def _get_one(self, data, query=None):
self.finish_request(self.format_data(data))
- @authenticate
- def _delete(self, query):
- data = yield self._eval_db_find_one(query)
- if data is None:
- raise web.HTTPError(constants.HTTP_NOT_FOUND,
- "[{}] not exit in table [{}]"
- .format(query, self.table))
-
+ @check.authenticate
+ @check.not_exist
+ def _delete(self, data, query=None):
yield self._eval_db(self.table, 'remove', query)
self.finish_request()
- @authenticate
- def _update(self, query, db_keys):
- if self.json_args is None:
- raise web.HTTPError(constants.HTTP_BAD_REQUEST, "No payload")
-
- # check old data exist
- from_data = yield self._eval_db_find_one(query)
- if from_data is None:
- raise web.HTTPError(constants.HTTP_NOT_FOUND,
- "{} could not be found in table [{}]"
- .format(query, self.table))
-
- data = self.table_cls.from_dict(from_data)
- # check new data exist
- equal, new_query = self._update_query(db_keys, data)
- if not equal:
- to_data = yield self._eval_db_find_one(new_query)
- if to_data is not None:
- raise web.HTTPError(constants.HTTP_FORBIDDEN,
- "{} already exists in table [{}]"
- .format(new_query, self.table))
-
- # we merge the whole document """
- edit_request = self._update_requests(data)
-
- """ Updating the DB """
- yield self._eval_db(self.table, 'update', query, edit_request,
+ @check.authenticate
+ @check.no_body
+ @check.not_exist
+ @check.updated_one_not_exist
+ def _update(self, data, query=None, **kwargs):
+ data = self.table_cls.from_dict(data)
+ update_req = self._update_requests(data)
+ yield self._eval_db(self.table, 'update', query, update_req,
check_keys=False)
- edit_request['_id'] = str(data._id)
- self.finish_request(edit_request)
+ update_req['_id'] = str(data._id)
+ self.finish_request(update_req)
def _update_requests(self, data):
request = dict()
request = self._update_request(request, k, v,
data.__getattribute__(k))
if not request:
- raise web.HTTPError(constants.HTTP_FORBIDDEN, "Nothing to update")
+ raises.Forbidden(message.no_update())
edit_request = data.format()
edit_request.update(request)
equal = True
for key in keys:
new = self.json_args.get(key)
- old = data.__getattribute__(key)
+ old = data.get(key)
if new is None:
new = old
elif new != old:
equal = False
query[key] = new
- return equal, query
+ return query if not equal else dict()
def _eval_db(self, table, method, *args, **kwargs):
exec_collection = self.db.__getattr__(table)