add copyright to files in testAPI
[releng.git] / utils / test / result_collection_api / opnfv_testapi / tornado_swagger / swagger.py
1 ##############################################################################
2 # Copyright (c) 2016 ZTE Corporation
3 # feng.xiaowei@zte.com.cn
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 import inspect
10 from functools import wraps
11 from HTMLParser import HTMLParser
12
13 import epydoc.markup
14 import tornado.web
15
16 from settings import default_settings, models
17 from handlers import swagger_handlers
18
19
20 class EpytextParser(HTMLParser):
21     a_text = False
22
23     def __init__(self, tag):
24         HTMLParser.__init__(self)
25         self.tag = tag
26         self.data = None
27
28     def handle_starttag(self, tag, attr):
29         if tag == self.tag:
30             self.a_text = True
31
32     def handle_endtag(self, tag):
33         if tag == self.tag:
34             self.a_text = False
35
36     def handle_data(self, data):
37         if self.a_text:
38             self.data = data
39
40     def get_data(self):
41         return self.data
42
43
44 class DocParser(object):
45     def __init__(self):
46         self.notes = None
47         self.summary = None
48         self.responseClass = None
49         self.responseMessages = []
50         self.params = {}
51         self.properties = {}
52
53     def parse_docstring(self, text):
54         if text is None:
55             return
56
57         errors = []
58         doc = epydoc.markup.parse(text, markup='epytext', errors=errors)
59         _, fields = doc.split_fields(errors)
60
61         for field in fields:
62             tag = field.tag()
63             arg = field.arg()
64             body = field.body()
65             self._get_parser(tag)(arg=arg, body=body)
66         return doc
67
68     def _get_parser(self, tag):
69         parser = {
70             'param': self._parse_param,
71             'type': self._parse_type,
72             'in': self._parse_in,
73             'required': self._parse_required,
74             'rtype': self._parse_rtype,
75             'property': self._parse_property,
76             'ptype': self._parse_ptype,
77             'return': self._parse_return,
78             'raise': self._parse_return,
79             'notes': self._parse_notes,
80             'description': self._parse_description,
81         }
82         return parser.get(tag, self._not_supported)
83
84     def _parse_param(self, **kwargs):
85         arg = kwargs.get('arg', None)
86         body = self._get_body(**kwargs)
87         self.params.setdefault(arg, {}).update({
88             'name': arg,
89             'description': body,
90         })
91
92         if 'paramType' not in self.params[arg]:
93             self.params[arg]['paramType'] = 'query'
94
95     def _parse_type(self, **kwargs):
96         arg = kwargs.get('arg', None)
97         body = self._get_body(**kwargs)
98         self.params.setdefault(arg, {}).update({
99             'name': arg,
100             'dataType': body
101         })
102
103     def _parse_in(self, **kwargs):
104         arg = kwargs.get('arg', None)
105         body = self._get_body(**kwargs)
106         self.params.setdefault(arg, {}).update({
107             'name': arg,
108             'paramType': body
109         })
110
111     def _parse_required(self, **kwargs):
112         arg = kwargs.get('arg', None)
113         body = self._get_body(**kwargs)
114         self.params.setdefault(arg, {}).update({
115             'name': arg,
116             'required': False if body in ['False', 'false'] else True
117         })
118
119     def _parse_rtype(self, **kwargs):
120         body = self._get_body(**kwargs)
121         self.responseClass = body
122
123     def _parse_property(self, **kwargs):
124         arg = kwargs.get('arg', None)
125         self.properties.setdefault(arg, {}).update({
126             'type': 'string'
127         })
128
129     def _parse_ptype(self, **kwargs):
130         arg = kwargs.get('arg', None)
131         code = self._parse_epytext_para('code', **kwargs)
132         link = self._parse_epytext_para('link', **kwargs)
133         if code is None:
134             self.properties.setdefault(arg, {}).update({
135                 'type': link
136             })
137         elif code == 'list':
138             self.properties.setdefault(arg, {}).update({
139                 'type': 'array',
140                 'items': {'type': link}
141             })
142
143     def _parse_return(self, **kwargs):
144         arg = kwargs.get('arg', None)
145         body = self._get_body(**kwargs)
146         self.responseMessages.append({
147             'code': arg,
148             'message': body
149         })
150
151     def _parse_notes(self, **kwargs):
152         body = self._get_body(**kwargs)
153         self.notes = self._sanitize_doc(body)
154
155     def _parse_description(self, **kwargs):
156         body = self._get_body(**kwargs)
157         self.summary = self._sanitize_doc(body)
158
159     def _not_supported(self, **kwargs):
160         pass
161
162     @staticmethod
163     def _sanitize_doc(comment):
164         return comment.replace('\n', '<br/>') if comment else comment
165
166     @staticmethod
167     def _get_body(**kwargs):
168         body = kwargs.get('body', None)
169         return body.to_plaintext(None).strip() if body else body
170
171     @staticmethod
172     def _parse_epytext_para(tag, **kwargs):
173         def _parse_epytext(tag, body):
174             epytextParser = EpytextParser(tag)
175             epytextParser.feed(str(body))
176             data = epytextParser.get_data()
177             epytextParser.close()
178             return data
179
180         body = kwargs.get('body', None)
181         return _parse_epytext(tag, body) if body else body
182
183
184 class model(DocParser):
185     def __init__(self, *args, **kwargs):
186         super(model, self).__init__()
187         self.args = args
188         self.kwargs = kwargs
189         self.required = []
190         self.cls = None
191
192     def __call__(self, *args):
193         if self.cls:
194             return self.cls
195
196         cls = args[0]
197         self._parse_model(cls)
198
199         return cls
200
201     def _parse_model(self, cls):
202         self.id = cls.__name__
203         self.cls = cls
204         if '__init__' in dir(cls):
205             self._parse_args(cls.__init__)
206         self.parse_docstring(inspect.getdoc(cls))
207         models.append(self)
208
209     def _parse_args(self, func):
210         argspec = inspect.getargspec(func)
211         argspec.args.remove("self")
212         defaults = {}
213         if argspec.defaults:
214             defaults = list(zip(argspec.args[-len(argspec.defaults):],
215                                 argspec.defaults))
216         required_args_count = len(argspec.args) - len(defaults)
217         for arg in argspec.args[:required_args_count]:
218             self.required.append(arg)
219             self.properties.setdefault(arg, {'type': 'string'})
220         for arg, default in defaults:
221             self.properties.setdefault(arg, {
222                 'type': 'string',
223                 "default": default
224             })
225
226
227 class operation(DocParser):
228     def __init__(self, nickname='apis', **kwds):
229         super(operation, self).__init__()
230         self.nickname = nickname
231         self.func = None
232         self.func_args = []
233         self.kwds = kwds
234
235     def __call__(self, *args, **kwds):
236         if self.func:
237             return self.func(*args, **kwds)
238
239         func = args[0]
240         self._parse_operation(func)
241
242         @wraps(func)
243         def __wrapper__(*in_args, **in_kwds):
244             return self.func(*in_args, **in_kwds)
245
246         __wrapper__.rest_api = self
247         return __wrapper__
248
249     def _parse_operation(self, func):
250         self.func = func
251
252         self.__name__ = func.__name__
253         self._parse_args(func)
254         self.parse_docstring(inspect.getdoc(self.func))
255
256     def _parse_args(self, func):
257         argspec = inspect.getargspec(func)
258         argspec.args.remove("self")
259
260         defaults = []
261         if argspec.defaults:
262             defaults = argspec.args[-len(argspec.defaults):]
263
264         for arg in argspec.args:
265             if arg in defaults:
266                 required = False
267             else:
268                 required = True
269             self.params.setdefault(arg, {
270                 'name': arg,
271                 'required': required,
272                 'paramType': 'path',
273                 'dataType': 'string'
274             })
275         self.func_args = argspec.args
276
277
278 def docs(**opts):
279     default_settings.update(opts)
280
281
282 class Application(tornado.web.Application):
283     def __init__(self, handlers=None,
284                  default_host="",
285                  transforms=None,
286                  **settings):
287         super(Application, self).__init__(swagger_handlers() + handlers,
288                                           default_host,
289                                           transforms,
290                                           **settings)