Merge "allow authentication to be disabled"
[releng.git] / utils / test / testapi / opnfv_testapi / common / check.py
1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corp
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 import functools
10 import re
11
12 from tornado import gen
13
14 from opnfv_testapi.common import constants
15 from opnfv_testapi.common import message
16 from opnfv_testapi.common import raises
17 from opnfv_testapi.common.config import CONF
18 from opnfv_testapi.db import api as dbapi
19
20
21 def is_authorized(method):
22     @functools.wraps(method)
23     def wrapper(self, *args, **kwargs):
24         if CONF.api_authenticate and self.table in ['pods']:
25             testapi_id = self.get_secure_cookie(constants.TESTAPI_ID)
26             if not testapi_id:
27                 raises.Unauthorized(message.not_login())
28             user_info = yield dbapi.db_find_one('users', {'user': testapi_id})
29             if not user_info:
30                 raises.Unauthorized(message.not_lfid())
31             kwargs['owner'] = testapi_id
32         ret = yield gen.coroutine(method)(self, *args, **kwargs)
33         raise gen.Return(ret)
34     return wrapper
35
36
37 def valid_token(method):
38     @functools.wraps(method)
39     def wrapper(self, *args, **kwargs):
40         if self.auth and self.table == 'results':
41             try:
42                 token = self.request.headers['X-Auth-Token']
43             except KeyError:
44                 raises.Unauthorized(message.unauthorized())
45             query = {'access_token': token}
46             check = yield dbapi.db_find_one('tokens', query)
47             if not check:
48                 raises.Forbidden(message.invalid_token())
49         ret = yield gen.coroutine(method)(self, *args, **kwargs)
50         raise gen.Return(ret)
51     return wrapper
52
53
54 def not_exist(xstep):
55     @functools.wraps(xstep)
56     def wrap(self, *args, **kwargs):
57         query = kwargs.get('query')
58         data = yield dbapi.db_find_one(self.table, query)
59         if not data:
60             raises.NotFound(message.not_found(self.table, query))
61         ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
62         raise gen.Return(ret)
63
64     return wrap
65
66
67 def no_body(xstep):
68     @functools.wraps(xstep)
69     def wrap(self, *args, **kwargs):
70         if self.json_args is None:
71             raises.BadRequest(message.no_body())
72         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
73         raise gen.Return(ret)
74
75     return wrap
76
77
78 def miss_fields(xstep):
79     @functools.wraps(xstep)
80     def wrap(self, *args, **kwargs):
81         fields = kwargs.pop('miss_fields', [])
82         if fields:
83             for miss in fields:
84                 miss_data = self.json_args.get(miss)
85                 if miss_data is None or miss_data == '':
86                     raises.BadRequest(message.missing(miss))
87         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
88         raise gen.Return(ret)
89     return wrap
90
91
92 def carriers_exist(xstep):
93     @functools.wraps(xstep)
94     def wrap(self, *args, **kwargs):
95         carriers = kwargs.pop('carriers', {})
96         if carriers:
97             for table, query in carriers:
98                 exist = yield dbapi.db_find_one(table, query())
99                 if not exist:
100                     raises.Forbidden(message.not_found(table, query()))
101         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
102         raise gen.Return(ret)
103     return wrap
104
105
106 def values_check(xstep):
107     @functools.wraps(xstep)
108     def wrap(self, *args, **kwargs):
109         checks = kwargs.pop('values_check', {})
110         if checks:
111             for field, check, options in checks:
112                 if not check(field, options):
113                     raises.BadRequest(message.invalid_value(field, options))
114         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
115         raise gen.Return(ret)
116     return wrap
117
118
119 def new_not_exists(xstep):
120     @functools.wraps(xstep)
121     def wrap(self, *args, **kwargs):
122         query = kwargs.get('query')
123         if query:
124             query_data = query()
125             if self.table == 'pods':
126                 if query_data.get('name') is not None:
127                     query_data['name'] = re.compile(query_data.get('name'),
128                                                     re.IGNORECASE)
129             to_data = yield dbapi.db_find_one(self.table, query_data)
130             if to_data:
131                 raises.Forbidden(message.exist(self.table, query()))
132         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
133         raise gen.Return(ret)
134     return wrap
135
136
137 def updated_one_not_exist(xstep):
138     @functools.wraps(xstep)
139     def wrap(self, data, *args, **kwargs):
140         db_keys = kwargs.pop('db_keys', [])
141         query = self._update_query(db_keys, data)
142         if query:
143             to_data = yield dbapi.db_find_one(self.table, query)
144             if to_data:
145                 raises.Forbidden(message.exist(self.table, query))
146         ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
147         raise gen.Return(ret)
148     return wrap