leverage LFID authentication to pod creation
[releng.git] / utils / test / testapi / opnfv_testapi / common / check.py
1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corp
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 import functools
10 import re
11
12 from tornado import gen
13
14 from opnfv_testapi.common import constants
15 from opnfv_testapi.common import message
16 from opnfv_testapi.common import raises
17 from opnfv_testapi.db import api as dbapi
18
19
20 def is_authorized(method):
21     @functools.wraps(method)
22     def wrapper(self, *args, **kwargs):
23         if self.table in ['pods']:
24             testapi_id = self.get_secure_cookie(constants.TESTAPI_ID)
25             if not testapi_id:
26                 raises.Unauthorized(message.not_login())
27             user_info = yield dbapi.db_find_one('users', {'user': testapi_id})
28             if not user_info:
29                 raises.Unauthorized(message.not_lfid())
30             kwargs['owner'] = testapi_id
31         ret = yield gen.coroutine(method)(self, *args, **kwargs)
32         raise gen.Return(ret)
33     return wrapper
34
35
36 def valid_token(method):
37     @functools.wraps(method)
38     def wrapper(self, *args, **kwargs):
39         if self.auth and self.table == 'results':
40             try:
41                 token = self.request.headers['X-Auth-Token']
42             except KeyError:
43                 raises.Unauthorized(message.unauthorized())
44             query = {'access_token': token}
45             check = yield dbapi.db_find_one('tokens', query)
46             if not check:
47                 raises.Forbidden(message.invalid_token())
48         ret = yield gen.coroutine(method)(self, *args, **kwargs)
49         raise gen.Return(ret)
50     return wrapper
51
52
53 def not_exist(xstep):
54     @functools.wraps(xstep)
55     def wrap(self, *args, **kwargs):
56         query = kwargs.get('query')
57         data = yield dbapi.db_find_one(self.table, query)
58         if not data:
59             raises.NotFound(message.not_found(self.table, query))
60         ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
61         raise gen.Return(ret)
62
63     return wrap
64
65
66 def no_body(xstep):
67     @functools.wraps(xstep)
68     def wrap(self, *args, **kwargs):
69         if self.json_args is None:
70             raises.BadRequest(message.no_body())
71         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
72         raise gen.Return(ret)
73
74     return wrap
75
76
77 def miss_fields(xstep):
78     @functools.wraps(xstep)
79     def wrap(self, *args, **kwargs):
80         fields = kwargs.pop('miss_fields', [])
81         if fields:
82             for miss in fields:
83                 miss_data = self.json_args.get(miss)
84                 if miss_data is None or miss_data == '':
85                     raises.BadRequest(message.missing(miss))
86         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
87         raise gen.Return(ret)
88     return wrap
89
90
91 def carriers_exist(xstep):
92     @functools.wraps(xstep)
93     def wrap(self, *args, **kwargs):
94         carriers = kwargs.pop('carriers', {})
95         if carriers:
96             for table, query in carriers:
97                 exist = yield dbapi.db_find_one(table, query())
98                 if not exist:
99                     raises.Forbidden(message.not_found(table, query()))
100         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
101         raise gen.Return(ret)
102     return wrap
103
104
105 def new_not_exists(xstep):
106     @functools.wraps(xstep)
107     def wrap(self, *args, **kwargs):
108         query = kwargs.get('query')
109         if query:
110             query_data = query()
111             if self.table == 'pods':
112                 if query_data.get('name') is not None:
113                     query_data['name'] = re.compile(query_data.get('name'),
114                                                     re.IGNORECASE)
115             to_data = yield dbapi.db_find_one(self.table, query_data)
116             if to_data:
117                 raises.Forbidden(message.exist(self.table, query()))
118         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
119         raise gen.Return(ret)
120     return wrap
121
122
123 def updated_one_not_exist(xstep):
124     @functools.wraps(xstep)
125     def wrap(self, data, *args, **kwargs):
126         db_keys = kwargs.pop('db_keys', [])
127         query = self._update_query(db_keys, data)
128         if query:
129             to_data = yield dbapi.db_find_one(self.table, query)
130             if to_data:
131                 raises.Forbidden(message.exist(self.table, query))
132         ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
133         raise gen.Return(ret)
134     return wrap