escalator should use oslo.xxx instead of oslo-xxx
[escalator.git] / api / escalator / common / rpc.py
1 # Copyright 2013 Red Hat, Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 """
17 RPC Controller
18 """
19 import datetime
20 import traceback
21
22 from oslo.config import cfg
23 from oslo.log import log as logging
24 import oslo.utils.importutils as imp
25 from oslo.utils import timeutils
26 import six
27 from webob import exc
28
29 from escalator.common import client
30 from escalator.common import exception
31 from escalator.common import utils
32 from escalator.common import wsgi
33 from escalator import i18n
34
35 LOG = logging.getLogger(__name__)
36 _ = i18n._
37 _LE = i18n._LE
38
39
40 rpc_opts = [
41     # NOTE(flaper87): Shamelessly copied
42     # from oslo rpc.
43     cfg.ListOpt('allowed_rpc_exception_modules',
44                 default=['openstack.common.exception',
45                          'escalator.common.exception',
46                          'exceptions',
47                          ],
48                 help='Modules of exceptions that are permitted to be recreated'
49                      ' upon receiving exception data from an rpc call.'),
50 ]
51
52 CONF = cfg.CONF
53 CONF.register_opts(rpc_opts)
54
55
56 class RPCJSONSerializer(wsgi.JSONResponseSerializer):
57
58     def _sanitizer(self, obj):
59         def to_primitive(_type, _value):
60             return {"_type": _type, "_value": _value}
61
62         if isinstance(obj, datetime.datetime):
63             return to_primitive("datetime", timeutils.strtime(obj))
64
65         return super(RPCJSONSerializer, self)._sanitizer(obj)
66
67
68 class RPCJSONDeserializer(wsgi.JSONRequestDeserializer):
69
70     def _to_datetime(self, obj):
71         return timeutils.parse_strtime(obj)
72
73     def _sanitizer(self, obj):
74         try:
75             _type, _value = obj["_type"], obj["_value"]
76             return getattr(self, "_to_" + _type)(_value)
77         except (KeyError, AttributeError):
78             return obj
79
80
81 class Controller(object):
82     """
83     Base RPCController.
84
85     This is the base controller for RPC based APIs. Commands
86     handled by this controller respect the following form:
87
88         [{
89             'command': 'method_name',
90             'kwargs': {...}
91         }]
92
93     The controller is capable of processing more than one command
94     per request and will always return a list of results.
95
96     :params raise_exc: Boolean that specifies whether to raise
97     exceptions instead of "serializing" them.
98     """
99
100     def __init__(self, raise_exc=False):
101         self._registered = {}
102         self.raise_exc = raise_exc
103
104     def register(self, resource, filtered=None, excluded=None, refiner=None):
105         """
106         Exports methods through the RPC Api.
107
108         :params resource: Resource's instance to register.
109         :params filtered: List of methods that *can* be registered. Read
110         as "Method must be in this list".
111         :params excluded: List of methods to exclude.
112         :params refiner: Callable to use as filter for methods.
113
114         :raises AssertionError: If refiner is not callable.
115         """
116
117         funcs = filter(lambda x: not x.startswith("_"), dir(resource))
118
119         if filtered:
120             funcs = [f for f in funcs if f in filtered]
121
122         if excluded:
123             funcs = [f for f in funcs if f not in excluded]
124
125         if refiner:
126             assert callable(refiner), "Refiner must be callable"
127             funcs = filter(refiner, funcs)
128
129         for name in funcs:
130             meth = getattr(resource, name)
131
132             if not callable(meth):
133                 continue
134
135             self._registered[name] = meth
136
137     def __call__(self, req, body):
138         """
139         Executes the command
140         """
141
142         if not isinstance(body, list):
143             msg = _("Request must be a list of commands")
144             raise exc.HTTPBadRequest(explanation=msg)
145
146         def validate(cmd):
147             if not isinstance(cmd, dict):
148                 msg = _("Bad Command: %s") % str(cmd)
149                 raise exc.HTTPBadRequest(explanation=msg)
150
151             command, kwargs = cmd.get("command"), cmd.get("kwargs")
152
153             if (not command or not isinstance(command, six.string_types) or
154                     (kwargs and not isinstance(kwargs, dict))):
155                 msg = _("Wrong command structure: %s") % (str(cmd))
156                 raise exc.HTTPBadRequest(explanation=msg)
157
158             method = self._registered.get(command)
159             if not method:
160                 # Just raise 404 if the user tries to
161                 # access a private method. No need for
162                 # 403 here since logically the command
163                 # is not registered to the rpc dispatcher
164                 raise exc.HTTPNotFound(explanation=_("Command not found"))
165
166             return True
167
168         # If more than one command were sent then they might
169         # be intended to be executed sequentially, that for,
170         # lets first verify they're all valid before executing
171         # them.
172         commands = filter(validate, body)
173
174         results = []
175         for cmd in commands:
176             # kwargs is not required
177             command, kwargs = cmd["command"], cmd.get("kwargs", {})
178             method = self._registered[command]
179             try:
180                 result = method(req.context, **kwargs)
181             except Exception as e:
182                 if self.raise_exc:
183                     raise
184
185                 cls, val = e.__class__, utils.exception_to_str(e)
186                 msg = (_LE("RPC Call Error: %(val)s\n%(tb)s") %
187                        dict(val=val, tb=traceback.format_exc()))
188                 LOG.error(msg)
189
190                 # NOTE(flaper87): Don't propagate all exceptions
191                 # but the ones allowed by the user.
192                 module = cls.__module__
193                 if module not in CONF.allowed_rpc_exception_modules:
194                     cls = exception.RPCError
195                     val = six.text_type(exception.RPCError(cls=cls, val=val))
196
197                 cls_path = "%s.%s" % (cls.__module__, cls.__name__)
198                 result = {"_error": {"cls": cls_path, "val": val}}
199             results.append(result)
200         return results
201
202
203 class RPCClient(client.BaseClient):
204
205     def __init__(self, *args, **kwargs):
206         self._serializer = RPCJSONSerializer()
207         self._deserializer = RPCJSONDeserializer()
208
209         self.raise_exc = kwargs.pop("raise_exc", True)
210         self.base_path = kwargs.pop("base_path", '/rpc')
211         super(RPCClient, self).__init__(*args, **kwargs)
212
213     @client.handle_unauthenticated
214     def bulk_request(self, commands):
215         """
216         Execute multiple commands in a single request.
217
218         :params commands: List of commands to send. Commands
219         must respect the following form:
220
221             {
222                 'command': 'method_name',
223                 'kwargs': method_kwargs
224             }
225         """
226         body = self._serializer.to_json(commands)
227         response = super(RPCClient, self).do_request('POST',
228                                                      self.base_path,
229                                                      body)
230         return self._deserializer.from_json(response.read())
231
232     def do_request(self, method, **kwargs):
233         """
234         Simple do_request override. This method serializes
235         the outgoing body and builds the command that will
236         be sent.
237
238         :params method: The remote python method to call
239         :params kwargs: Dynamic parameters that will be
240             passed to the remote method.
241         """
242         content = self.bulk_request([{'command': method,
243                                       'kwargs': kwargs}])
244
245         # NOTE(flaper87): Return the first result if
246         # a single command was executed.
247         content = content[0]
248
249         # NOTE(flaper87): Check if content is an error
250         # and re-raise it if raise_exc is True. Before
251         # checking if content contains the '_error' key,
252         # verify if it is an instance of dict - since the
253         # RPC call may have returned something different.
254         if self.raise_exc and (isinstance(content, dict) and
255                                '_error' in content):
256             error = content['_error']
257             try:
258                 exc_cls = imp.import_class(error['cls'])
259                 raise exc_cls(error['val'])
260             except ImportError:
261                 # NOTE(flaper87): The exception
262                 # class couldn't be imported, using
263                 # a generic exception.
264                 raise exception.RPCError(**error)
265         return content
266
267     def __getattr__(self, item):
268         """
269         This method returns a method_proxy that
270         will execute the rpc call in the registry
271         service.
272         """
273         if item.startswith('_'):
274             raise AttributeError(item)
275
276         def method_proxy(**kw):
277             return self.do_request(item, **kw)
278
279         return method_proxy