project-ize testAPI
[releng.git] / utils / test / result_collection_api / opnfv_testapi / tornado_swagger_ui / tornado_swagger / swagger.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3
4 import inspect
5 from functools import wraps
6 import epydoc.markup
7 from HTMLParser import HTMLParser
8 import tornado.web
9 from settings import default_settings, models
10 from handlers import swagger_handlers
11
12 __author__ = 'serena'
13
14
15 class EpytextParser(HTMLParser):
16     a_text = False
17
18     def __init__(self, tag):
19         HTMLParser.__init__(self)
20         self.tag = tag
21         self.data = None
22
23     def handle_starttag(self, tag, attr):
24         if tag == self.tag:
25             self.a_text = True
26
27     def handle_endtag(self, tag):
28         if tag == self.tag:
29             self.a_text = False
30
31     def handle_data(self, data):
32         if self.a_text:
33             self.data = data
34
35     def get_data(self):
36         return self.data
37
38
39 class DocParser(object):
40     def __init__(self):
41         self.notes = None
42         self.summary = None
43         self.responseClass = None
44         self.responseMessages = []
45         self.params = {}
46         self.properties = {}
47
48     def parse_docstring(self, text):
49         if text is None:
50             return
51
52         errors = []
53         doc = epydoc.markup.parse(text, markup='epytext', errors=errors)
54         _, fields = doc.split_fields(errors)
55
56         for field in fields:
57             tag = field.tag()
58             arg = field.arg()
59             body = field.body()
60             self._get_parser(tag)(arg=arg, body=body)
61         return doc
62
63     def _get_parser(self, tag):
64         parser = {
65             'param': self._parse_param,
66             'type': self._parse_type,
67             'in': self._parse_in,
68             'required': self._parse_required,
69             'rtype': self._parse_rtype,
70             'property': self._parse_property,
71             'ptype': self._parse_ptype,
72             'return': self._parse_return,
73             'raise': self._parse_return,
74             'notes': self._parse_notes,
75             'description': self._parse_description,
76         }
77         return parser.get(tag, self._not_supported)
78
79     def _parse_param(self, **kwargs):
80         arg = kwargs.get('arg', None)
81         body = self._get_body(**kwargs)
82         self.params.setdefault(arg, {}).update({
83             'name': arg,
84             'description': body,
85         })
86
87         if 'paramType' not in self.params[arg]:
88             self.params[arg]['paramType'] = 'query'
89
90     def _parse_type(self, **kwargs):
91         arg = kwargs.get('arg', None)
92         body = self._get_body(**kwargs)
93         self.params.setdefault(arg, {}).update({
94             'name': arg,
95             'dataType': body
96         })
97
98     def _parse_in(self, **kwargs):
99         arg = kwargs.get('arg', None)
100         body = self._get_body(**kwargs)
101         self.params.setdefault(arg, {}).update({
102             'name': arg,
103             'paramType': body
104         })
105
106     def _parse_required(self, **kwargs):
107         arg = kwargs.get('arg', None)
108         body = self._get_body(**kwargs)
109         self.params.setdefault(arg, {}).update({
110             'name': arg,
111             'required': False if body in ['False', 'false'] else True
112         })
113
114     def _parse_rtype(self, **kwargs):
115         body = self._get_body(**kwargs)
116         self.responseClass = body
117
118     def _parse_property(self, **kwargs):
119         arg = kwargs.get('arg', None)
120         self.properties.setdefault(arg, {}).update({
121             'type': 'string'
122         })
123
124     def _parse_ptype(self, **kwargs):
125         arg = kwargs.get('arg', None)
126         code = self._parse_epytext_para('code', **kwargs)
127         link = self._parse_epytext_para('link', **kwargs)
128         if code is None:
129             self.properties.setdefault(arg, {}).update({
130                 'type': link
131             })
132         elif code == 'list':
133             self.properties.setdefault(arg, {}).update({
134                 'type': 'array',
135                 'items': {'type': link}
136             })
137
138     def _parse_return(self, **kwargs):
139         arg = kwargs.get('arg', None)
140         body = self._get_body(**kwargs)
141         self.responseMessages.append({
142             'code': arg,
143             'message': body
144         })
145
146     def _parse_notes(self, **kwargs):
147         body = self._get_body(**kwargs)
148         self.notes = self._sanitize_doc(body)
149
150     def _parse_description(self, **kwargs):
151         body = self._get_body(**kwargs)
152         self.summary = self._sanitize_doc(body)
153
154     def _not_supported(self, **kwargs):
155         pass
156
157     @staticmethod
158     def _sanitize_doc(comment):
159         return comment.replace('\n', '<br/>') if comment else comment
160
161     @staticmethod
162     def _get_body(**kwargs):
163         body = kwargs.get('body', None)
164         return body.to_plaintext(None).strip() if body else body
165
166     @staticmethod
167     def _parse_epytext_para(tag, **kwargs):
168         def _parse_epytext(tag, body):
169             epytextParser = EpytextParser(tag)
170             epytextParser.feed(str(body))
171             data = epytextParser.get_data()
172             epytextParser.close()
173             return data
174
175         body = kwargs.get('body', None)
176         return _parse_epytext(tag, body) if body else body
177
178
179 class model(DocParser):
180     def __init__(self, *args, **kwargs):
181         super(model, self).__init__()
182         self.args = args
183         self.kwargs = kwargs
184         self.required = []
185         self.cls = None
186
187     def __call__(self, *args):
188         if self.cls:
189             return self.cls
190
191         cls = args[0]
192         self._parse_model(cls)
193
194         return cls
195
196     def _parse_model(self, cls):
197         self.id = cls.__name__
198         self.cls = cls
199         if '__init__' in dir(cls):
200             self._parse_args(cls.__init__)
201         self.parse_docstring(inspect.getdoc(cls))
202         models.append(self)
203
204     def _parse_args(self, func):
205         argspec = inspect.getargspec(func)
206         argspec.args.remove("self")
207         defaults = {}
208         if argspec.defaults:
209             defaults = list(zip(argspec.args[-len(argspec.defaults):],
210                                 argspec.defaults))
211         required_args_count = len(argspec.args) - len(defaults)
212         for arg in argspec.args[:required_args_count]:
213             self.required.append(arg)
214             self.properties.setdefault(arg, {'type': 'string'})
215         for arg, default in defaults:
216             self.properties.setdefault(arg, {
217                 'type': 'string',
218                 "default": default
219             })
220
221
222 class operation(DocParser):
223     def __init__(self, nickname='apis', **kwds):
224         super(operation, self).__init__()
225         self.nickname = nickname
226         self.func = None
227         self.func_args = []
228         self.kwds = kwds
229
230     def __call__(self, *args, **kwds):
231         if self.func:
232             return self.func(*args, **kwds)
233
234         func = args[0]
235         self._parse_operation(func)
236
237         @wraps(func)
238         def __wrapper__(*in_args, **in_kwds):
239             return self.func(*in_args, **in_kwds)
240
241         __wrapper__.rest_api = self
242         return __wrapper__
243
244     def _parse_operation(self, func):
245         self.func = func
246
247         self.__name__ = func.__name__
248         self._parse_args(func)
249         self.parse_docstring(inspect.getdoc(self.func))
250
251     def _parse_args(self, func):
252         argspec = inspect.getargspec(func)
253         argspec.args.remove("self")
254
255         defaults = []
256         if argspec.defaults:
257             defaults = argspec.args[-len(argspec.defaults):]
258
259         for arg in argspec.args:
260             if arg in defaults:
261                 required = False
262             else:
263                 required = True
264             self.params.setdefault(arg, {
265                 'name': arg,
266                 'required': required,
267                 'paramType': 'path',
268                 'dataType': 'string'
269             })
270         self.func_args = argspec.args
271
272
273 def docs(**opts):
274     default_settings.update(opts)
275
276
277 class Application(tornado.web.Application):
278     def __init__(self, handlers=None,
279                  default_host="",
280                  transforms=None,
281                  **settings):
282         super(Application, self).__init__(swagger_handlers() + handlers,
283                                           default_host,
284                                           transforms,
285                                           **settings)