1 # Copyright 2010 United States Government as represented by the
2 # Administrator of the National Aeronautics and Space Administration.
3 # Copyright 2010 OpenStack Foundation
4 # Copyright 2014 IBM Corp.
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
20 Utility methods for working with WSGI servers
22 from __future__ import print_function
32 from eventlet.green import socket
33 from eventlet.green import ssl
34 import eventlet.greenio
36 from oslo_serialization import jsonutils
37 from oslo_concurrency import processutils
38 from oslo_config import cfg
39 from oslo_log import log as logging
40 from oslo_log import loggers
42 import routes.middleware
46 from webob import multidict
48 from escalator.common import exception
49 from escalator.common import utils
50 from escalator import i18n
59 cfg.StrOpt('bind_host', default='0.0.0.0',
60 help=_('Address to bind the server. Useful when '
61 'selecting a particular network interface.')),
62 cfg.IntOpt('bind_port',
63 help=_('The port on which the server will listen.')),
67 cfg.IntOpt('backlog', default=4096,
68 help=_('The backlog value that will be used when creating the '
69 'TCP listener socket.')),
70 cfg.IntOpt('tcp_keepidle', default=600,
71 help=_('The value for the socket option TCP_KEEPIDLE. This is '
72 'the time in seconds that the connection must be idle '
73 'before TCP starts sending keepalive probes.')),
74 cfg.StrOpt('ca_file', help=_('CA certificate file to use to verify '
75 'connecting clients.')),
76 cfg.StrOpt('cert_file', help=_('Certificate file to use when starting API '
78 cfg.StrOpt('key_file', help=_('Private key file to use when starting API '
83 cfg.IntOpt('workers', default=processutils.get_worker_count(),
84 help=_('The number of child process workers that will be '
85 'created to service requests. The default will be '
86 'equal to the number of CPUs available.')),
87 cfg.IntOpt('max_header_line', default=16384,
88 help=_('Maximum line size of message headers to be accepted. '
89 'max_header_line may need to be increased when using '
90 'large tokens (typically those generated by the '
91 'Keystone v3 API with big service catalogs')),
92 cfg.BoolOpt('http_keepalive', default=True,
93 help=_('If False, server will return the header '
94 '"Connection: close", '
95 'If True, server will return "Connection: Keep-Alive" '
96 'in its responses. In order to close the client socket '
97 'connection explicitly after the response is sent and '
98 'read successfully by the client, you simply have to '
99 'set this option to False when you create a wsgi '
104 cfg.BoolOpt("enabled", default=False,
105 help=_('If False fully disable profiling feature.')),
106 cfg.BoolOpt("trace_sqlalchemy", default=False,
107 help=_("If False doesn't trace SQL requests."))
111 LOG = logging.getLogger(__name__)
114 CONF.register_opts(bind_opts)
115 CONF.register_opts(socket_opts)
116 CONF.register_opts(eventlet_opts)
117 CONF.register_opts(profiler_opts, group="profiler")
119 ASYNC_EVENTLET_THREAD_POOL_LIST = []
122 def get_bind_addr(default_port=None):
123 """Return the host and port to bind to."""
124 return (CONF.bind_host, CONF.bind_port or default_port)
127 def ssl_wrap_socket(sock):
129 Wrap an existing socket in SSL
131 :param sock: non-SSL socket to wrap
133 :returns: An SSL wrapped socket
135 utils.validate_key_cert(CONF.key_file, CONF.cert_file)
139 'certfile': CONF.cert_file,
140 'keyfile': CONF.key_file,
141 'cert_reqs': ssl.CERT_NONE,
145 ssl_kwargs['ca_certs'] = CONF.ca_file
146 ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
148 return ssl.wrap_socket(sock, **ssl_kwargs)
151 def get_socket(default_port):
153 Bind socket to bind ip:port in conf
155 note: Mostly comes from Swift with a few small changes...
157 :param default_port: port to bind to if none is specified in conf
159 :returns : a socket object as returned from socket.listen or
160 ssl.wrap_socket if conf specifies cert_file
162 bind_addr = get_bind_addr(default_port)
164 # TODO(jaypipes): eventlet's greened socket module does not actually
165 # support IPv6 in getaddrinfo(). We need to get around this in the
166 # future or monitor upstream for a fix
168 addr[0] for addr in socket.getaddrinfo(bind_addr[0],
172 if addr[0] in (socket.AF_INET, socket.AF_INET6)
175 use_ssl = CONF.key_file or CONF.cert_file
176 if use_ssl and (not CONF.key_file or not CONF.cert_file):
177 raise RuntimeError(_("When running server in SSL mode, you must "
178 "specify both a cert_file and key_file "
179 "option value in your configuration file"))
181 sock = utils.get_test_suite_socket()
182 retry_until = time.time() + 30
184 while not sock and time.time() < retry_until:
186 sock = eventlet.listen(bind_addr,
187 backlog=CONF.backlog,
188 family=address_family)
189 except socket.error as err:
190 if err.args[0] != errno.EADDRINUSE:
194 raise RuntimeError(_("Could not bind to %(host)s:%(port)s after"
195 " trying for 30 seconds") %
196 {'host': bind_addr[0],
197 'port': bind_addr[1]})
202 def set_eventlet_hub():
204 eventlet.hubs.use_hub('poll')
207 eventlet.hubs.use_hub('selects')
209 msg = _("eventlet 'poll' nor 'selects' hubs are available "
211 raise exception.WorkerCreationFailure(
215 def get_asynchronous_eventlet_pool(size=1000):
216 """Return eventlet pool to caller.
218 Also store pools created in global list, to wait on
219 it after getting signal for graceful shutdown.
221 :param size: eventlet pool size
222 :returns: eventlet pool
224 global ASYNC_EVENTLET_THREAD_POOL_LIST
226 pool = eventlet.GreenPool(size=size)
227 # Add pool to global ASYNC_EVENTLET_THREAD_POOL_LIST
228 ASYNC_EVENTLET_THREAD_POOL_LIST.append(pool)
233 class Server(object):
234 """Server class to manage multiple WSGI sockets and applications.
237 def __init__(self, threads=1000):
238 os.umask(0o27) # ensure files are created with the correct privileges
239 self._logger = logging.getLogger("eventlet.wsgi.server")
240 self._wsgi_logger = loggers.WritableLogger(self._logger)
241 self.threads = threads
242 self.children = set()
243 self.stale_children = set()
245 self.pgid = os.getpid()
247 # NOTE(flaper87): Make sure this process
248 # runs in its own process group.
249 os.setpgid(self.pgid, self.pgid)
251 # NOTE(flaper87): When running escalator-control,
252 # (escalator's functional tests, for example)
253 # setpgid fails with EPERM as escalator-control
254 # creates a fresh session, of which the newly
255 # launched service becomes the leader (session
256 # leaders may not change process groups)
258 # Running escalator-(api) is safe and
259 # shouldn't raise any error here.
262 def hup(self, *args):
264 Reloads configuration files with zero down time
266 signal.signal(signal.SIGHUP, signal.SIG_IGN)
267 raise exception.SIGHUPInterrupt
269 def kill_children(self, *args):
270 """Kills the entire process group."""
271 signal.signal(signal.SIGTERM, signal.SIG_IGN)
272 signal.signal(signal.SIGINT, signal.SIG_IGN)
274 os.killpg(self.pgid, signal.SIGTERM)
276 def start(self, application, default_port):
278 Run a WSGI server with the given application.
280 :param application: The application to be run in the WSGI server
281 :param default_port: Port to bind to if none is specified in conf
283 self.application = application
284 self.default_port = default_port
288 def start_wsgi(self):
290 if CONF.workers == 0:
291 # Useful for profiling, test, debug etc.
292 self.pool = self.create_pool()
293 self.pool.spawn_n(self._single_run, self.application, self.sock)
296 LOG.info(_LI("Starting %d workers") % CONF.workers)
297 signal.signal(signal.SIGTERM, self.kill_children)
298 signal.signal(signal.SIGINT, self.kill_children)
299 signal.signal(signal.SIGHUP, self.hup)
300 while len(self.children) < CONF.workers:
303 def create_pool(self):
304 return eventlet.GreenPool(size=self.threads)
306 def _remove_children(self, pid):
307 if pid in self.children:
308 self.children.remove(pid)
309 LOG.info(_LI('Removed dead child %s') % pid)
310 elif pid in self.stale_children:
311 self.stale_children.remove(pid)
312 LOG.info(_LI('Removed stale child %s') % pid)
314 LOG.warn(_LW('Unrecognised child %s') % pid)
316 def _verify_and_respawn_children(self, pid, status):
317 if len(self.stale_children) == 0:
318 LOG.debug('No stale children')
319 if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
320 LOG.error(_LE('Not respawning child %d, cannot '
321 'recover from termination') % pid)
322 if not self.children and not self.stale_children:
324 _LI('All workers have terminated. Exiting'))
327 if len(self.children) < CONF.workers:
330 def wait_on_children(self):
333 pid, status = os.wait()
334 if os.WIFEXITED(status) or os.WIFSIGNALED(status):
335 self._remove_children(pid)
336 self._verify_and_respawn_children(pid, status)
337 except OSError as err:
338 if err.errno not in (errno.EINTR, errno.ECHILD):
340 except KeyboardInterrupt:
341 LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
343 except exception.SIGHUPInterrupt:
346 eventlet.greenio.shutdown_safe(self.sock)
350 def configure(self, old_conf=None, has_changed=None):
352 Apply configuration settings
354 :param old_conf: Cached old configuration settings (if any)
355 :param has changed: callable to determine if a parameter has changed
357 eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
358 self.configure_socket(old_conf, has_changed)
362 Reload and re-apply configuration settings
364 Existing child processes are sent a SIGHUP signal
365 and will exit after completing existing requests.
366 New child processes, which will have the updated
367 configuration, are spawned. This allows preventing
368 interruption to the service.
370 def _has_changed(old, new, param):
372 new = getattr(new, param)
375 old_conf = utils.stash_conf_values()
376 has_changed = functools.partial(_has_changed, old_conf, CONF)
377 CONF.reload_config_files()
378 os.killpg(self.pgid, signal.SIGHUP)
379 self.stale_children = self.children
380 self.children = set()
382 # Ensure any logging config changes are picked up
383 logging.setup(CONF, 'escalator')
385 self.configure(old_conf, has_changed)
389 """Wait until all servers have completed running."""
392 self.wait_on_children()
395 except KeyboardInterrupt:
399 def child_hup(*args):
400 """Shuts down child processes, existing requests are handled."""
401 signal.signal(signal.SIGHUP, signal.SIG_IGN)
402 eventlet.wsgi.is_accepting = False
407 signal.signal(signal.SIGHUP, child_hup)
408 signal.signal(signal.SIGTERM, signal.SIG_DFL)
409 # ignore the interrupt signal to avoid a race whereby
410 # a child worker receives the signal before the parent
411 # and is respawned unnecessarily as a result
412 signal.signal(signal.SIGINT, signal.SIG_IGN)
413 # The child has no need to stash the unwrapped
414 # socket, and the reference prevents a clean
418 LOG.info(_LI('Child %d exiting normally') % os.getpid())
419 # self.pool.waitall() is now called in wsgi's server so
420 # it's safe to exit here
423 LOG.info(_LI('Started child %s') % pid)
424 self.children.add(pid)
426 def run_server(self):
427 """Run a WSGI server."""
428 if cfg.CONF.pydev_worker_debug_host:
429 utils.setup_remote_pydev_debug(cfg.CONF.pydev_worker_debug_host,
430 cfg.CONF.pydev_worker_debug_port)
432 eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
433 self.pool = self.create_pool()
435 eventlet.wsgi.server(self.sock,
437 log=self._wsgi_logger,
438 custom_pool=self.pool,
440 keepalive=CONF.http_keepalive)
441 except socket.error as err:
442 if err[0] != errno.EINVAL:
445 # waiting on async pools
446 if ASYNC_EVENTLET_THREAD_POOL_LIST:
447 for pool in ASYNC_EVENTLET_THREAD_POOL_LIST:
450 def _single_run(self, application, sock):
451 """Start a WSGI server in a new green thread."""
452 LOG.info(_LI("Starting single process server"))
453 eventlet.wsgi.server(sock, application, custom_pool=self.pool,
454 log=self._wsgi_logger,
456 keepalive=CONF.http_keepalive)
458 def configure_socket(self, old_conf=None, has_changed=None):
460 Ensure a socket exists and is appropriately configured.
462 This function is called on start up, and can also be
463 called in the event of a configuration reload.
465 When called for the first time a new socket is created.
466 If reloading and either bind_host or bind port have been
467 changed the existing socket must be closed and a new
468 socket opened (laws of physics).
470 In all other cases (bind_host/bind_port have not changed)
471 the existing socket is reused.
473 :param old_conf: Cached old configuration settings (if any)
474 :param has changed: callable to determine if a parameter has changed
476 # Do we need a fresh socket?
477 new_sock = (old_conf is None or (
478 has_changed('bind_host') or
479 has_changed('bind_port')))
480 # Will we be using https?
481 use_ssl = not (not CONF.cert_file or not CONF.key_file)
482 # Were we using https before?
483 old_use_ssl = (old_conf is not None and not (
484 not old_conf.get('key_file') or
485 not old_conf.get('cert_file')))
486 # Do we now need to perform an SSL wrap on the socket?
487 wrap_sock = use_ssl is True and (old_use_ssl is False or new_sock)
488 # Do we now need to perform an SSL unwrap on the socket?
489 unwrap_sock = use_ssl is False and old_use_ssl is True
493 if old_conf is not None:
495 _sock = get_socket(self.default_port)
496 _sock.setsockopt(socket.SOL_SOCKET,
497 socket.SO_REUSEADDR, 1)
498 # sockets can hang around forever without keepalive
499 _sock.setsockopt(socket.SOL_SOCKET,
500 socket.SO_KEEPALIVE, 1)
504 self.sock = ssl_wrap_socket(self._sock)
507 self.sock = self._sock
509 if new_sock and not use_ssl:
510 self.sock = self._sock
512 # Pick up newly deployed certs
513 if old_conf is not None and use_ssl is True and old_use_ssl is True:
514 if has_changed('cert_file') or has_changed('key_file'):
515 utils.validate_key_cert(CONF.key_file, CONF.cert_file)
516 if has_changed('cert_file'):
517 self.sock.certfile = CONF.cert_file
518 if has_changed('key_file'):
519 self.sock.keyfile = CONF.key_file
521 if new_sock or (old_conf is not None and has_changed('tcp_keepidle')):
522 # This option isn't available in the OS X version of eventlet
523 if hasattr(socket, 'TCP_KEEPIDLE'):
524 self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
527 if old_conf is not None and has_changed('backlog'):
528 self.sock.listen(CONF.backlog)
531 class Middleware(object):
533 Base WSGI middleware wrapper. These classes require an application to be
534 initialized that will be called next. By default the middleware will
535 simply call its wrapped app, or you can override __call__ to customize its
539 def __init__(self, application):
540 self.application = application
543 def factory(cls, global_conf, **local_conf):
548 def process_request(self, req):
550 Called on each request.
552 If this returns None, the next application down the stack will be
553 executed. If it returns a response then that response will be returned
554 and execution will stop here.
559 def process_response(self, response):
560 """Do whatever you'd like to the response."""
564 def __call__(self, req):
565 response = self.process_request(req)
568 response = req.get_response(self.application)
569 response.request = req
571 return self.process_response(response)
572 except webob.exc.HTTPException as e:
576 class Debug(Middleware):
578 Helper class that can be inserted into any WSGI application chain
579 to get information about the request and response.
583 def __call__(self, req):
584 print(("*" * 40) + " REQUEST ENVIRON")
585 for key, value in req.environ.items():
586 print(key, "=", value)
588 resp = req.get_response(self.application)
590 print(("*" * 40) + " RESPONSE HEADERS")
591 for (key, value) in six.iteritems(resp.headers):
592 print(key, "=", value)
595 resp.app_iter = self.print_generator(resp.app_iter)
600 def print_generator(app_iter):
602 Iterator that prints the contents of a wrapper string iterator
605 print(("*" * 40) + " BODY")
606 for part in app_iter:
607 sys.stdout.write(part)
613 class APIMapper(routes.Mapper):
615 Handle route matching when url is '' because routes.Mapper returns
616 an error in this case.
619 def routematch(self, url=None, environ=None):
621 result = self._match("", environ)
622 return result[0], result[1]
623 return routes.Mapper.routematch(self, url, environ)
626 class RejectMethodController(object):
628 def reject(self, req, allowed_methods, *args, **kwargs):
629 LOG.debug("The method %s is not allowed for this resource" %
630 req.environ['REQUEST_METHOD'])
631 raise webob.exc.HTTPMethodNotAllowed(
632 headers=[('Allow', allowed_methods)])
635 class Router(object):
637 WSGI middleware that maps incoming requests to WSGI apps.
640 def __init__(self, mapper):
642 Create a router for the given routes.Mapper.
644 Each route in `mapper` must specify a 'controller', which is a
645 WSGI app to call. You'll probably want to specify an 'action' as
646 well and have your controller be a wsgi.Controller, who will route
647 the request to the action method.
650 mapper = routes.Mapper()
651 sc = ServerController()
653 # Explicit mapping of one route to a controller+action
654 mapper.connect(None, "/svrlist", controller=sc, action="list")
656 # Actions are all implicitly defined
657 mapper.resource("server", "servers", controller=sc)
659 # Pointing to an arbitrary WSGI app. You can specify the
660 # {path_info:.*} parameter so the target app can be handed just that
661 # section of the URL.
662 mapper.connect(None, "/v1.0/{path_info:.*}", controller=BlogApp())
664 mapper.redirect("", "/")
666 self._router = routes.middleware.RoutesMiddleware(self._dispatch,
670 def factory(cls, global_conf, **local_conf):
671 return cls(APIMapper())
674 def __call__(self, req):
676 Route the incoming request to a controller based on self.map.
677 If no match, return either a 404(Not Found) or 501(Not Implemented).
685 Called by self._router after matching the incoming request to a route
686 and putting the information into req.environ. Either returns 404,
687 501, or the routed WSGI app's response.
689 match = req.environ['wsgiorg.routing_args'][1]
691 implemented_http_methods = ['GET', 'HEAD', 'POST', 'PUT',
693 if req.environ['REQUEST_METHOD'] not in implemented_http_methods:
694 return webob.exc.HTTPNotImplemented()
696 return webob.exc.HTTPNotFound()
697 app = match['controller']
701 class Request(webob.Request):
702 """Add some OpenStack API-specific logic to the base webob.Request."""
704 def best_match_content_type(self):
705 """Determine the requested response content-type."""
706 supported = ('application/json',)
707 bm = self.accept.best_match(supported)
708 return bm or 'application/json'
710 def get_content_type(self, allowed_content_types):
711 """Determine content type of the request body."""
712 if "Content-Type" not in self.headers:
713 raise exception.InvalidContentType(content_type=None)
715 content_type = self.content_type
717 if content_type not in allowed_content_types:
718 raise exception.InvalidContentType(content_type=content_type)
722 def best_match_language(self):
723 """Determines best available locale from the Accept-Language header.
725 :returns: the best language match or None if the 'Accept-Language'
726 header was not available in the request.
728 if not self.accept_language:
730 langs = i18n.get_available_languages('escalator')
731 return self.accept_language.best_match(langs)
733 def get_content_range(self):
734 """Return the `Range` in a request."""
735 range_str = self.headers.get('Content-Range')
736 if range_str is not None:
737 range_ = webob.byterange.ContentRange.parse(range_str)
739 msg = _('Malformed Content-Range header: %s') % range_str
740 raise webob.exc.HTTPBadRequest(explanation=msg)
744 class JSONRequestDeserializer(object):
745 valid_transfer_encoding = frozenset(['chunked', 'compress', 'deflate',
748 def has_body(self, request):
750 Returns whether a Webob.Request object will possess an entity body.
752 :param request: Webob.Request object
754 request_encoding = request.headers.get('transfer-encoding', '').lower()
755 is_valid_encoding = request_encoding in self.valid_transfer_encoding
756 if is_valid_encoding and request.is_body_readable:
758 elif request.content_length > 0:
765 """Sanitizer method that will be passed to jsonutils.loads."""
768 def from_json(self, datastring):
770 return jsonutils.loads(datastring, object_hook=self._sanitizer)
772 msg = _('Malformed JSON in request body.')
773 raise webob.exc.HTTPBadRequest(explanation=msg)
775 def default(self, request):
776 if self.has_body(request):
777 return {'body': self.from_json(request.body)}
782 class JSONResponseSerializer(object):
784 def _sanitizer(self, obj):
785 """Sanitizer method that will be passed to jsonutils.dumps."""
786 if hasattr(obj, "to_dict"):
788 if isinstance(obj, multidict.MultiDict):
790 return jsonutils.to_primitive(obj)
792 def to_json(self, data):
793 return jsonutils.dumps(data, default=self._sanitizer)
795 def default(self, response, result):
796 response.content_type = 'application/json'
797 response.body = self.to_json(result)
800 def translate_exception(req, e):
801 """Translates all translatable elements of the given exception."""
803 # The RequestClass attribute in the webob.dec.wsgify decorator
804 # does not guarantee that the request object will be a particular
805 # type; this check is therefore necessary.
806 if not hasattr(req, "best_match_language"):
809 locale = req.best_match_language()
811 if isinstance(e, webob.exc.HTTPError):
812 e.explanation = i18n.translate(e.explanation, locale)
813 e.detail = i18n.translate(e.detail, locale)
814 if getattr(e, 'body_template', None):
815 e.body_template = i18n.translate(e.body_template, locale)
819 class Resource(object):
821 WSGI app that handles (de)serialization and controller dispatch.
823 Reads routing information supplied by RoutesMiddleware and calls
824 the requested action method upon its deserializer, controller,
825 and serializer. Those three objects may implement any of the basic
826 controller action methods (create, update, show, index, delete)
827 along with any that may be specified in the api router. A 'default'
828 method may also be implemented to be used in place of any
829 non-implemented actions. Deserializer methods must accept a request
830 argument and return a dictionary. Controller methods must accept a
831 request argument. Additionally, they must also accept keyword
832 arguments that represent the keys returned by the Deserializer. They
833 may raise a webob.exc exception or return a dict, which will be
834 serialized by requested content type.
837 def __init__(self, controller, deserializer=None, serializer=None):
839 :param controller: object that implement methods created by routes lib
840 :param deserializer: object that supports webob request deserialization
841 through controller-like actions
842 :param serializer: object that supports webob response serialization
843 through controller-like actions
845 self.controller = controller
846 self.serializer = serializer or JSONResponseSerializer()
847 self.deserializer = deserializer or JSONRequestDeserializer()
849 @webob.dec.wsgify(RequestClass=Request)
850 def __call__(self, request):
851 """WSGI method that controls (de)serialization and method dispatch."""
852 action_args = self.get_action_args(request.environ)
853 action = action_args.pop('action', None)
856 deserialized_request = self.dispatch(self.deserializer,
858 action_args.update(deserialized_request)
859 action_result = self.dispatch(self.controller, action,
860 request, **action_args)
861 except webob.exc.WSGIHTTPException as e:
862 exc_info = sys.exc_info()
863 raise translate_exception(request, e), None, exc_info[2]
866 response = webob.Response(request=request)
867 self.dispatch(self.serializer, action, response, action_result)
869 except webob.exc.WSGIHTTPException as e:
870 return translate_exception(request, e)
871 except webob.exc.HTTPException as e:
873 # return unserializable result (typically a webob exc)
877 def dispatch(self, obj, action, *args, **kwargs):
878 """Find action-specific method on self and call it."""
880 method = getattr(obj, action)
881 except AttributeError:
882 method = getattr(obj, 'default')
884 return method(*args, **kwargs)
886 def get_action_args(self, request_environment):
887 """Parse dictionary created by routes library."""
889 args = request_environment['wsgiorg.routing_args'][1].copy()
894 del args['controller']