1 # Copyright 2013 Red Hat, Inc.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
22 from oslo_config import cfg
23 from oslo_log import log as logging
24 import oslo_utils.importutils as imp
25 from oslo_utils import timeutils
29 from escalator.common import client
30 from escalator.common import exception
31 from escalator.common import utils
32 from escalator.common import wsgi
33 from escalator import i18n
35 LOG = logging.getLogger(__name__)
41 # NOTE(flaper87): Shamelessly copied
43 cfg.ListOpt('allowed_rpc_exception_modules',
44 default=['openstack.common.exception',
45 'escalator.common.exception',
48 help='Modules of exceptions that are permitted to be recreated'
49 ' upon receiving exception data from an rpc call.'),
53 CONF.register_opts(rpc_opts)
56 class RPCJSONSerializer(wsgi.JSONResponseSerializer):
58 def _sanitizer(self, obj):
59 def to_primitive(_type, _value):
60 return {"_type": _type, "_value": _value}
62 if isinstance(obj, datetime.datetime):
63 return to_primitive("datetime", timeutils.strtime(obj))
65 return super(RPCJSONSerializer, self)._sanitizer(obj)
68 class RPCJSONDeserializer(wsgi.JSONRequestDeserializer):
70 def _to_datetime(self, obj):
71 return timeutils.parse_strtime(obj)
73 def _sanitizer(self, obj):
75 _type, _value = obj["_type"], obj["_value"]
76 return getattr(self, "_to_" + _type)(_value)
77 except (KeyError, AttributeError):
81 class Controller(object):
85 This is the base controller for RPC based APIs. Commands
86 handled by this controller respect the following form:
89 'command': 'method_name',
93 The controller is capable of processing more than one command
94 per request and will always return a list of results.
96 :params raise_exc: Boolean that specifies whether to raise
97 exceptions instead of "serializing" them.
100 def __init__(self, raise_exc=False):
101 self._registered = {}
102 self.raise_exc = raise_exc
104 def register(self, resource, filtered=None, excluded=None, refiner=None):
106 Exports methods through the RPC Api.
108 :params resource: Resource's instance to register.
109 :params filtered: List of methods that *can* be registered. Read
110 as "Method must be in this list".
111 :params excluded: List of methods to exclude.
112 :params refiner: Callable to use as filter for methods.
114 :raises AssertionError: If refiner is not callable.
117 funcs = filter(lambda x: not x.startswith("_"), dir(resource))
120 funcs = [f for f in funcs if f in filtered]
123 funcs = [f for f in funcs if f not in excluded]
126 assert callable(refiner), "Refiner must be callable"
127 funcs = filter(refiner, funcs)
130 meth = getattr(resource, name)
132 if not callable(meth):
135 self._registered[name] = meth
137 def __call__(self, req, body):
142 if not isinstance(body, list):
143 msg = _("Request must be a list of commands")
144 raise exc.HTTPBadRequest(explanation=msg)
147 if not isinstance(cmd, dict):
148 msg = _("Bad Command: %s") % str(cmd)
149 raise exc.HTTPBadRequest(explanation=msg)
151 command, kwargs = cmd.get("command"), cmd.get("kwargs")
153 if (not command or not isinstance(command, six.string_types) or
154 (kwargs and not isinstance(kwargs, dict))):
155 msg = _("Wrong command structure: %s") % (str(cmd))
156 raise exc.HTTPBadRequest(explanation=msg)
158 method = self._registered.get(command)
160 # Just raise 404 if the user tries to
161 # access a private method. No need for
162 # 403 here since logically the command
163 # is not registered to the rpc dispatcher
164 raise exc.HTTPNotFound(explanation=_("Command not found"))
168 # If more than one command were sent then they might
169 # be intended to be executed sequentially, that for,
170 # lets first verify they're all valid before executing
172 commands = filter(validate, body)
176 # kwargs is not required
177 command, kwargs = cmd["command"], cmd.get("kwargs", {})
178 method = self._registered[command]
180 result = method(req.context, **kwargs)
181 except Exception as e:
185 cls, val = e.__class__, utils.exception_to_str(e)
186 msg = (_LE("RPC Call Error: %(val)s\n%(tb)s") %
187 dict(val=val, tb=traceback.format_exc()))
190 # NOTE(flaper87): Don't propagate all exceptions
191 # but the ones allowed by the user.
192 module = cls.__module__
193 if module not in CONF.allowed_rpc_exception_modules:
194 cls = exception.RPCError
195 val = six.text_type(exception.RPCError(cls=cls, val=val))
197 cls_path = "%s.%s" % (cls.__module__, cls.__name__)
198 result = {"_error": {"cls": cls_path, "val": val}}
199 results.append(result)
203 class RPCClient(client.BaseClient):
205 def __init__(self, *args, **kwargs):
206 self._serializer = RPCJSONSerializer()
207 self._deserializer = RPCJSONDeserializer()
209 self.raise_exc = kwargs.pop("raise_exc", True)
210 self.base_path = kwargs.pop("base_path", '/rpc')
211 super(RPCClient, self).__init__(*args, **kwargs)
213 @client.handle_unauthenticated
214 def bulk_request(self, commands):
216 Execute multiple commands in a single request.
218 :params commands: List of commands to send. Commands
219 must respect the following form:
222 'command': 'method_name',
223 'kwargs': method_kwargs
226 body = self._serializer.to_json(commands)
227 response = super(RPCClient, self).do_request('POST',
230 return self._deserializer.from_json(response.read())
232 def do_request(self, method, **kwargs):
234 Simple do_request override. This method serializes
235 the outgoing body and builds the command that will
238 :params method: The remote python method to call
239 :params kwargs: Dynamic parameters that will be
240 passed to the remote method.
242 content = self.bulk_request([{'command': method,
245 # NOTE(flaper87): Return the first result if
246 # a single command was executed.
249 # NOTE(flaper87): Check if content is an error
250 # and re-raise it if raise_exc is True. Before
251 # checking if content contains the '_error' key,
252 # verify if it is an instance of dict - since the
253 # RPC call may have returned something different.
254 if self.raise_exc and (isinstance(content, dict) and
255 '_error' in content):
256 error = content['_error']
258 exc_cls = imp.import_class(error['cls'])
259 raise exc_cls(error['val'])
261 # NOTE(flaper87): The exception
262 # class couldn't be imported, using
263 # a generic exception.
264 raise exception.RPCError(**error)
267 def __getattr__(self, item):
269 This method returns a method_proxy that
270 will execute the rpc call in the registry
273 if item.startswith('_'):
274 raise AttributeError(item)
276 def method_proxy(**kw):
277 return self.do_request(item, **kw)