301eedbbf5546534a9fa811fa2987715d28638f7
[escalator.git] / client / escalatorclient / common / http.py
1 # Copyright 2012 OpenStack Foundation
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 import copy
17 import logging
18 import socket
19 from oslo_utils import encodeutils
20 from escalatorclient.common import https
21 from escalatorclient.common.utils import safe_header
22 from escalatorclient import exc
23 from oslo_utils import importutils
24 from oslo_utils import netutils
25 from simplejson import decoder
26 import requests
27 try:
28     from requests.packages.urllib3.exceptions import ProtocolError
29 except ImportError:
30     ProtocolError = requests.exceptions.ConnectionError
31 import six
32 from six.moves.urllib import parse
33
34 try:
35     import json
36 except ImportError:
37     import simplejson as json
38
39 # Python 2.5 compat fix
40 if not hasattr(parse, 'parse_qsl'):
41     import cgi
42     parse.parse_qsl = cgi.parse_qsl
43
44
45 osprofiler_web = importutils.try_import("osprofiler.web")
46
47 LOG = logging.getLogger(__name__)
48 USER_AGENT = 'python-escalatorclient'
49 CHUNKSIZE = 1024 * 64  # 64kB
50
51
52 class HTTPClient(object):
53
54     def __init__(self, endpoint, **kwargs):
55         self.endpoint = endpoint
56         self.identity_headers = kwargs.get('identity_headers')
57         self.auth_token = kwargs.get('token')
58         if self.identity_headers:
59             if self.identity_headers.get('X-Auth-Token'):
60                 self.auth_token = self.identity_headers.get('X-Auth-Token')
61                 del self.identity_headers['X-Auth-Token']
62
63         self.session = requests.Session()
64         self.session.headers["User-Agent"] = USER_AGENT
65
66         if self.auth_token:
67             self.session.headers["X-Auth-Token"] = self.auth_token
68
69         self.timeout = float(kwargs.get('timeout', 600))
70
71         if self.endpoint.startswith("https"):
72             compression = kwargs.get('ssl_compression', True)
73
74             if not compression:
75                 self.session.mount("escalator+https://", https.HTTPSAdapter())
76                 self.endpoint = 'escalator+' + self.endpoint
77
78                 self.session.verify = (
79                     kwargs.get('cacert', requests.certs.where()),
80                     kwargs.get('insecure', False))
81
82             else:
83                 if kwargs.get('insecure', False) is True:
84                     self.session.verify = False
85                 else:
86                     if kwargs.get('cacert', None) is not '':
87                         self.session.verify = kwargs.get('cacert', True)
88
89             self.session.cert = (kwargs.get('cert_file'),
90                                  kwargs.get('key_file'))
91
92     @staticmethod
93     def parse_endpoint(endpoint):
94         return netutils.urlsplit(endpoint)
95
96     def log_curl_request(self, method, url, headers, data, kwargs):
97         curl = ['curl -g -i -X %s' % method]
98
99         headers = copy.deepcopy(headers)
100         headers.update(self.session.headers)
101
102         for (key, value) in six.iteritems(headers):
103             header = '-H \'%s: %s\'' % safe_header(key, value)
104             curl.append(header)
105
106         if not self.session.verify:
107             curl.append('-k')
108         else:
109             if isinstance(self.session.verify, six.string_types):
110                 curl.append(' --cacert %s' % self.session.verify)
111
112         if self.session.cert:
113             curl.append(' --cert %s --key %s' % self.session.cert)
114
115         if data and isinstance(data, six.string_types):
116             curl.append('-d \'%s\'' % data)
117
118         curl.append(url)
119
120         msg = ' '.join([encodeutils.safe_decode(item, errors='ignore')
121                         for item in curl])
122         LOG.debug(msg)
123
124     @staticmethod
125     def log_http_response(resp, body=None):
126         status = (resp.raw.version / 10.0, resp.status_code, resp.reason)
127         dump = ['\nHTTP/%.1f %s %s' % status]
128         headers = resp.headers.items()
129         dump.extend(['%s: %s' % safe_header(k, v) for k, v in headers])
130         dump.append('')
131         if body:
132             body = encodeutils.safe_decode(body)
133             dump.extend([body, ''])
134         LOG.debug('\n'.join([encodeutils.safe_decode(x, errors='ignore')
135                              for x in dump]))
136
137     @staticmethod
138     def encode_headers(headers):
139         """Encodes headers.
140
141         Note: This should be used right before
142         sending anything out.
143
144         :param headers: Headers to encode
145         :returns: Dictionary with encoded headers'
146                   names and values
147         """
148         return dict((encodeutils.safe_encode(h), encodeutils.safe_encode(v))
149                     for h, v in six.iteritems(headers) if v is not None)
150
151     def _request(self, method, url, **kwargs):
152         """Send an http request with the specified characteristics.
153         Wrapper around httplib.HTTP(S)Connection.request to handle tasks such
154         as setting headers and error handling.
155         """
156         # Copy the kwargs so we can reuse the original in case of redirects
157         headers = kwargs.pop("headers", {})
158         headers = headers and copy.deepcopy(headers) or {}
159
160         if self.identity_headers:
161             for k, v in six.iteritems(self.identity_headers):
162                 headers.setdefault(k, v)
163
164         # Default Content-Type is octet-stream
165         content_type = headers.get('Content-Type', 'application/octet-stream')
166
167         def chunk_body(body):
168             chunk = body
169             while chunk:
170                 chunk = body.read(CHUNKSIZE)
171                 if chunk == '':
172                     break
173                 yield chunk
174
175         data = kwargs.pop("data", None)
176         if data is not None and not isinstance(data, six.string_types):
177             try:
178                 data = json.dumps(data)
179                 content_type = 'application/json'
180             except TypeError:
181                 # Here we assume it's
182                 # a file-like object
183                 # and we'll chunk it
184                 data = chunk_body(data)
185
186         headers['Content-Type'] = content_type
187         stream = True if content_type == 'application/octet-stream' else False
188
189         if osprofiler_web:
190             headers.update(osprofiler_web.get_trace_id_headers())
191
192         # Note(flaper87): Before letting headers / url fly,
193         # they should be encoded otherwise httplib will
194         # complain.
195         headers = self.encode_headers(headers)
196
197         try:
198             if self.endpoint.endswith("/") or url.startswith("/"):
199                 conn_url = "%s%s" % (self.endpoint, url)
200             else:
201                 conn_url = "%s/%s" % (self.endpoint, url)
202             self.log_curl_request(method, conn_url, headers, data, kwargs)
203             resp = self.session.request(method,
204                                         conn_url,
205                                         data=data,
206                                         stream=stream,
207                                         headers=headers,
208                                         **kwargs)
209         except requests.exceptions.Timeout as e:
210             message = ("Error communicating with %(endpoint)s %(e)s" %
211                        dict(url=conn_url, e=e))
212             raise exc.InvalidEndpoint(message=message)
213         except (requests.exceptions.ConnectionError, ProtocolError) as e:
214             message = ("Error finding address for %(url)s: %(e)s" %
215                        dict(url=conn_url, e=e))
216             raise exc.CommunicationError(message=message)
217         except socket.gaierror as e:
218             message = "Error finding address for %s: %s" % (
219                 self.endpoint_hostname, e)
220             raise exc.InvalidEndpoint(message=message)
221         except (socket.error, socket.timeout) as e:
222             endpoint = self.endpoint
223             message = ("Error communicating with %(endpoint)s %(e)s" %
224                        {'endpoint': endpoint, 'e': e})
225             raise exc.CommunicationError(message=message)
226
227         if not resp.ok:
228             LOG.debug("Request returned failure status %s." % resp.status_code)
229             raise exc.from_response(resp, resp.text)
230         elif resp.status_code == requests.codes.MULTIPLE_CHOICES:
231             raise exc.from_response(resp)
232
233         content_type = resp.headers.get('Content-Type')
234
235         # Read body into string if it isn't obviously image data
236         if content_type == 'application/octet-stream':
237             # Do not read all response in memory when
238             # downloading an image.
239             body_iter = _close_after_stream(resp, CHUNKSIZE)
240             self.log_http_response(resp)
241         else:
242             content = resp.text
243             self.log_http_response(resp, content)
244             if content_type and content_type.startswith('application/json'):
245                 # Let's use requests json method,
246                 # it should take care of response
247                 # encoding
248                 try:
249                     body_iter = resp.json()
250                 except decoder.JSONDecodeError:
251                     status_body = {'status_code': resp.status_code}
252                     return resp, status_body
253             else:
254                 body_iter = six.StringIO(content)
255                 try:
256                     body_iter = json.loads(''.join([c for c in body_iter]))
257                 except ValueError:
258                     body_iter = None
259         return resp, body_iter
260
261     def head(self, url, **kwargs):
262         return self._request('HEAD', url, **kwargs)
263
264     def get(self, url, **kwargs):
265         return self._request('GET', url, **kwargs)
266
267     def post(self, url, **kwargs):
268         return self._request('POST', url, **kwargs)
269
270     def put(self, url, **kwargs):
271         return self._request('PUT', url, **kwargs)
272
273     def patch(self, url, **kwargs):
274         return self._request('PATCH', url, **kwargs)
275
276     def delete(self, url, **kwargs):
277         return self._request('DELETE', url, **kwargs)
278
279
280 def _close_after_stream(response, chunk_size):
281     """Iterate over the content and ensure the response is closed after."""
282     # Yield each chunk in the response body
283     for chunk in response.iter_content(chunk_size=chunk_size):
284         yield chunk
285     # Once we're done streaming the body, ensure everything is closed.
286     # This will return the connection to the HTTPConnectionPool in urllib3
287     # and ideally reduce the number of HTTPConnectionPool full warnings.
288     response.close()