1 # Copyright 2013 Red Hat, Inc.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
22 from oslo.config import cfg
23 from oslo.log import log as logging
24 import oslo.utils.importutils as imp
25 from oslo.utils import timeutils
29 from escalator.common import client
30 from escalator.common import exception
31 from escalator.common import utils
32 from escalator.common import wsgi
33 from escalator import i18n
35 LOG = logging.getLogger(__name__)
41 # NOTE(flaper87): Shamelessly copied
43 cfg.ListOpt('allowed_rpc_exception_modules',
44 default=['openstack.common.exception',
45 'escalator.common.exception',
48 help='Modules of exceptions that are permitted to be recreated'
49 ' upon receiving exception data from an rpc call.'),
53 CONF.register_opts(rpc_opts)
56 class RPCJSONSerializer(wsgi.JSONResponseSerializer):
58 def _sanitizer(self, obj):
59 def to_primitive(_type, _value):
60 return {"_type": _type, "_value": _value}
62 if isinstance(obj, datetime.datetime):
63 return to_primitive("datetime", timeutils.strtime(obj))
65 return super(RPCJSONSerializer, self)._sanitizer(obj)
68 class RPCJSONDeserializer(wsgi.JSONRequestDeserializer):
70 def _to_datetime(self, obj):
71 return timeutils.parse_strtime(obj)
73 def _sanitizer(self, obj):
75 _type, _value = obj["_type"], obj["_value"]
76 return getattr(self, "_to_" + _type)(_value)
77 except (KeyError, AttributeError):
81 class Controller(object):
85 This is the base controller for RPC based APIs. Commands
86 handled by this controller respect the following form:
89 'command': 'method_name',
93 The controller is capable of processing more than one command
94 per request and will always return a list of results.
96 :params raise_exc: Boolean that specifies whether to raise
97 exceptions instead of "serializing" them.
100 def __init__(self, raise_exc=False):
101 self._registered = {}
102 self.raise_exc = raise_exc
104 def register(self, resource, filtered=None, excluded=None, refiner=None):
106 Exports methods through the RPC Api.
108 :params resource: Resource's instance to register.
109 :params filtered: List of methods that *can* be registered. Read
110 as "Method must be in this list".
111 :params excluded: List of methods to exclude.
112 :params refiner: Callable to use as filter for methods.
114 :raises AssertionError: If refiner is not callable.
117 funcs = filter(lambda x: not x.startswith("_"), dir(resource))
120 funcs = [f for f in funcs if f in filtered]
123 funcs = [f for f in funcs if f not in excluded]
126 assert callable(refiner), "Refiner must be callable"
127 funcs = filter(refiner, funcs)
130 meth = getattr(resource, name)
132 if not callable(meth):
135 self._registered[name] = meth
137 def __call__(self, req, body):
142 if not isinstance(body, list):
143 msg = _("Request must be a list of commands")
144 raise exc.HTTPBadRequest(explanation=msg)
147 if not isinstance(cmd, dict):
148 msg = _("Bad Command: %s") % str(cmd)
149 raise exc.HTTPBadRequest(explanation=msg)
151 command, kwargs = cmd.get("command"), cmd.get("kwargs")
153 if (not command or not isinstance(command, six.string_types) or
154 (kwargs and not isinstance(kwargs, dict))):
155 msg = _("Wrong command structure: %s") % (str(cmd))
156 raise exc.HTTPBadRequest(explanation=msg)
158 method = self._registered.get(command)
160 # Just raise 404 if the user tries to
161 # access a private method. No need for
162 # 403 here since logically the command
163 # is not registered to the rpc dispatcher
164 raise exc.HTTPNotFound(explanation=_("Command not found"))
168 # If more than one command were sent then they might
169 # be intended to be executed sequentially, that for,
170 # lets first verify they're all valid before executing
172 commands = filter(validate, body)
176 # kwargs is not required
177 command, kwargs = cmd["command"], cmd.get("kwargs", {})
178 method = self._registered[command]
180 result = method(req.context, **kwargs)
181 except Exception as e:
185 cls, val = e.__class__, utils.exception_to_str(e)
186 msg = (_LE("RPC Call Error: %(val)s\n%(tb)s") %
187 dict(val=val, tb=traceback.format_exc()))
190 # NOTE(flaper87): Don't propagate all exceptions
191 # but the ones allowed by the user.
192 module = cls.__module__
193 if module not in CONF.allowed_rpc_exception_modules:
194 cls = exception.RPCError
195 val = six.text_type(exception.RPCError(cls=cls, val=val))
197 cls_path = "%s.%s" % (cls.__module__, cls.__name__)
198 result = {"_error": {"cls": cls_path, "val": val}}
199 results.append(result)
203 class RPCClient(client.BaseClient):
205 def __init__(self, *args, **kwargs):
206 self._serializer = RPCJSONSerializer()
207 self._deserializer = RPCJSONDeserializer()
209 self.raise_exc = kwargs.pop("raise_exc", True)
210 self.base_path = kwargs.pop("base_path", '/rpc')
211 super(RPCClient, self).__init__(*args, **kwargs)
213 @client.handle_unauthenticated
214 def bulk_request(self, commands):
216 Execute multiple commands in a single request.
218 :params commands: List of commands to send. Commands
219 must respect the following form:
222 'command': 'method_name',
223 'kwargs': method_kwargs
226 body = self._serializer.to_json(commands)
227 response = super(RPCClient, self).do_request('POST',
230 return self._deserializer.from_json(response.read())
232 def do_request(self, method, **kwargs):
234 Simple do_request override. This method serializes
235 the outgoing body and builds the command that will
238 :params method: The remote python method to call
239 :params kwargs: Dynamic parameters that will be
240 passed to the remote method.
242 content = self.bulk_request([{'command': method,
245 # NOTE(flaper87): Return the first result if
246 # a single command was executed.
249 # NOTE(flaper87): Check if content is an error
250 # and re-raise it if raise_exc is True. Before
251 # checking if content contains the '_error' key,
252 # verify if it is an instance of dict - since the
253 # RPC call may have returned something different.
254 if self.raise_exc and (isinstance(content, dict) and
255 '_error' in content):
256 error = content['_error']
258 exc_cls = imp.import_class(error['cls'])
259 raise exc_cls(error['val'])
261 # NOTE(flaper87): The exception
262 # class couldn't be imported, using
263 # a generic exception.
264 raise exception.RPCError(**error)
267 def __getattr__(self, item):
269 This method returns a method_proxy that
270 will execute the rpc call in the registry
273 if item.startswith('_'):
274 raise AttributeError(item)
276 def method_proxy(**kw):
277 return self.do_request(item, **kw)