leverage LFID as Authentication
[releng.git] / utils / test / testapi / opnfv_testapi / common / check.py
1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corp
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 import functools
10
11 import cas
12 from tornado import gen
13 from tornado import web
14
15 from opnfv_testapi.common import constants
16 from opnfv_testapi.common import message
17 from opnfv_testapi.common import raises
18 from opnfv_testapi.common.config import CONF
19 from opnfv_testapi.db import api as dbapi
20
21
22 def login(method):
23     @web.asynchronous
24     @gen.coroutine
25     @functools.wraps(method)
26     def wrapper(self, *args, **kwargs):
27         ticket = self.get_query_argument('ticket', default=None)
28         if ticket:
29             client = cas.CASClient(version='2',
30                                    server_url=CONF.lfid_cas_url,
31                                    service_url=CONF.ui_url)
32             (user, attrs, _) = client.verify_ticket(ticket=ticket)
33             print 'login user: {}'.format(user)
34             login_user = {
35                 'user': user,
36                 'email': attrs.get('mail'),
37                 'fullname': attrs.get('field_lf_full_name'),
38                 'groups': constants.TESTAPI_USERS + attrs.get('group', [])
39             }
40             q_user = {'user': user}
41             db_user = yield dbapi.db_find_one(constants.USER_TABLE, q_user)
42             if not db_user:
43                 dbapi.db_save(constants.USER_TABLE, login_user)
44             else:
45                 dbapi.db_update(constants.USER_TABLE, q_user, login_user)
46
47             self.clear_cookie(constants.TESTAPI_ID)
48             self.set_secure_cookie(constants.TESTAPI_ID, user)
49         ret = yield gen.coroutine(method)(self, *args, **kwargs)
50         raise gen.Return(ret)
51     return wrapper
52
53
54 def authenticate(method):
55     @web.asynchronous
56     @gen.coroutine
57     @functools.wraps(method)
58     def wrapper(self, *args, **kwargs):
59         if self.auth:
60             try:
61                 token = self.request.headers['X-Auth-Token']
62             except KeyError:
63                 raises.Unauthorized(message.unauthorized())
64             query = {'access_token': token}
65             check = yield dbapi.db_find_one('tokens', query)
66             if not check:
67                 raises.Forbidden(message.invalid_token())
68         ret = yield gen.coroutine(method)(self, *args, **kwargs)
69         raise gen.Return(ret)
70     return wrapper
71
72
73 def not_exist(xstep):
74     @functools.wraps(xstep)
75     def wrap(self, *args, **kwargs):
76         query = kwargs.get('query')
77         data = yield dbapi.db_find_one(self.table, query)
78         if not data:
79             raises.NotFound(message.not_found(self.table, query))
80         ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
81         raise gen.Return(ret)
82
83     return wrap
84
85
86 def no_body(xstep):
87     @functools.wraps(xstep)
88     def wrap(self, *args, **kwargs):
89         if self.json_args is None:
90             raises.BadRequest(message.no_body())
91         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
92         raise gen.Return(ret)
93
94     return wrap
95
96
97 def miss_fields(xstep):
98     @functools.wraps(xstep)
99     def wrap(self, *args, **kwargs):
100         fields = kwargs.pop('miss_fields', [])
101         if fields:
102             for miss in fields:
103                 miss_data = self.json_args.get(miss)
104                 if miss_data is None or miss_data == '':
105                     raises.BadRequest(message.missing(miss))
106         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
107         raise gen.Return(ret)
108     return wrap
109
110
111 def carriers_exist(xstep):
112     @functools.wraps(xstep)
113     def wrap(self, *args, **kwargs):
114         carriers = kwargs.pop('carriers', {})
115         if carriers:
116             for table, query in carriers:
117                 exist = yield dbapi.db_find_one(table, query())
118                 if not exist:
119                     raises.Forbidden(message.not_found(table, query()))
120         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
121         raise gen.Return(ret)
122     return wrap
123
124
125 def new_not_exists(xstep):
126     @functools.wraps(xstep)
127     def wrap(self, *args, **kwargs):
128         query = kwargs.get('query')
129         if query:
130             to_data = yield dbapi.db_find_one(self.table, query())
131             if to_data:
132                 raises.Forbidden(message.exist(self.table, query()))
133         ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
134         raise gen.Return(ret)
135     return wrap
136
137
138 def updated_one_not_exist(xstep):
139     @functools.wraps(xstep)
140     def wrap(self, data, *args, **kwargs):
141         db_keys = kwargs.pop('db_keys', [])
142         query = self._update_query(db_keys, data)
143         if query:
144             to_data = yield dbapi.db_find_one(self.table, query)
145             if to_data:
146                 raises.Forbidden(message.exist(self.table, query))
147         ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
148         raise gen.Return(ret)
149     return wrap