1 # Copyright 2012 OpenStack Foundation
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
19 from oslo_utils import encodeutils
20 from escalatorclient.common import https
21 from escalatorclient.common.utils import safe_header
22 from escalatorclient import exc
23 from oslo_utils import importutils
24 from oslo_utils import netutils
25 from simplejson import decoder
28 from requests.packages.urllib3.exceptions import ProtocolError
30 ProtocolError = requests.exceptions.ConnectionError
32 from six.moves.urllib import parse
37 import simplejson as json
39 # Python 2.5 compat fix
40 if not hasattr(parse, 'parse_qsl'):
42 parse.parse_qsl = cgi.parse_qsl
45 osprofiler_web = importutils.try_import("osprofiler.web")
47 LOG = logging.getLogger(__name__)
48 USER_AGENT = 'python-escalatorclient'
49 CHUNKSIZE = 1024 * 64 # 64kB
52 class HTTPClient(object):
54 def __init__(self, endpoint, **kwargs):
55 self.endpoint = endpoint
56 self.identity_headers = kwargs.get('identity_headers')
57 self.auth_token = kwargs.get('token')
58 if self.identity_headers:
59 if self.identity_headers.get('X-Auth-Token'):
60 self.auth_token = self.identity_headers.get('X-Auth-Token')
61 del self.identity_headers['X-Auth-Token']
63 self.session = requests.Session()
64 self.session.headers["User-Agent"] = USER_AGENT
67 self.session.headers["X-Auth-Token"] = self.auth_token
69 self.timeout = float(kwargs.get('timeout', 600))
71 if self.endpoint.startswith("https"):
72 compression = kwargs.get('ssl_compression', True)
75 self.session.mount("escalator+https://", https.HTTPSAdapter())
76 self.endpoint = 'escalator+' + self.endpoint
78 self.session.verify = (
79 kwargs.get('cacert', requests.certs.where()),
80 kwargs.get('insecure', False))
83 if kwargs.get('insecure', False) is True:
84 self.session.verify = False
86 if kwargs.get('cacert', None) is not '':
87 self.session.verify = kwargs.get('cacert', True)
89 self.session.cert = (kwargs.get('cert_file'),
90 kwargs.get('key_file'))
93 def parse_endpoint(endpoint):
94 return netutils.urlsplit(endpoint)
96 def log_curl_request(self, method, url, headers, data, kwargs):
97 curl = ['curl -g -i -X %s' % method]
99 headers = copy.deepcopy(headers)
100 headers.update(self.session.headers)
102 for (key, value) in six.iteritems(headers):
103 header = '-H \'%s: %s\'' % safe_header(key, value)
106 if not self.session.verify:
109 if isinstance(self.session.verify, six.string_types):
110 curl.append(' --cacert %s' % self.session.verify)
112 if self.session.cert:
113 curl.append(' --cert %s --key %s' % self.session.cert)
115 if data and isinstance(data, six.string_types):
116 curl.append('-d \'%s\'' % data)
120 msg = ' '.join([encodeutils.safe_decode(item, errors='ignore')
125 def log_http_response(resp, body=None):
126 status = (resp.raw.version / 10.0, resp.status_code, resp.reason)
127 dump = ['\nHTTP/%.1f %s %s' % status]
128 headers = resp.headers.items()
129 dump.extend(['%s: %s' % safe_header(k, v) for k, v in headers])
132 body = encodeutils.safe_decode(body)
133 dump.extend([body, ''])
134 LOG.debug('\n'.join([encodeutils.safe_decode(x, errors='ignore')
138 def encode_headers(headers):
141 Note: This should be used right before
142 sending anything out.
144 :param headers: Headers to encode
145 :returns: Dictionary with encoded headers'
148 return dict((encodeutils.safe_encode(h), encodeutils.safe_encode(v))
149 for h, v in six.iteritems(headers) if v is not None)
151 def _request(self, method, url, **kwargs):
152 """Send an http request with the specified characteristics.
153 Wrapper around httplib.HTTP(S)Connection.request to handle tasks such
154 as setting headers and error handling.
156 # Copy the kwargs so we can reuse the original in case of redirects
157 headers = kwargs.pop("headers", {})
158 headers = headers and copy.deepcopy(headers) or {}
160 if self.identity_headers:
161 for k, v in six.iteritems(self.identity_headers):
162 headers.setdefault(k, v)
164 # Default Content-Type is octet-stream
165 content_type = headers.get('Content-Type', 'application/octet-stream')
167 def chunk_body(body):
170 chunk = body.read(CHUNKSIZE)
175 data = kwargs.pop("data", None)
176 if data is not None and not isinstance(data, six.string_types):
178 data = json.dumps(data)
179 content_type = 'application/json'
181 # Here we assume it's
184 data = chunk_body(data)
186 headers['Content-Type'] = content_type
187 stream = True if content_type == 'application/octet-stream' else False
190 headers.update(osprofiler_web.get_trace_id_headers())
192 # Note(flaper87): Before letting headers / url fly,
193 # they should be encoded otherwise httplib will
195 headers = self.encode_headers(headers)
198 if self.endpoint.endswith("/") or url.startswith("/"):
199 conn_url = "%s%s" % (self.endpoint, url)
201 conn_url = "%s/%s" % (self.endpoint, url)
202 self.log_curl_request(method, conn_url, headers, data, kwargs)
203 resp = self.session.request(method,
209 except requests.exceptions.Timeout as e:
210 message = ("Error communicating with %(endpoint)s %(e)s" %
211 dict(url=conn_url, e=e))
212 raise exc.InvalidEndpoint(message=message)
213 except (requests.exceptions.ConnectionError, ProtocolError) as e:
214 message = ("Error finding address for %(url)s: %(e)s" %
215 dict(url=conn_url, e=e))
216 raise exc.CommunicationError(message=message)
217 except socket.gaierror as e:
218 message = "Error finding address for %s: %s" % (
219 self.endpoint_hostname, e)
220 raise exc.InvalidEndpoint(message=message)
221 except (socket.error, socket.timeout) as e:
222 endpoint = self.endpoint
223 message = ("Error communicating with %(endpoint)s %(e)s" %
224 {'endpoint': endpoint, 'e': e})
225 raise exc.CommunicationError(message=message)
228 LOG.debug("Request returned failure status %s." % resp.status_code)
229 raise exc.from_response(resp, resp.text)
230 elif resp.status_code == requests.codes.MULTIPLE_CHOICES:
231 raise exc.from_response(resp)
233 content_type = resp.headers.get('Content-Type')
235 # Read body into string if it isn't obviously image data
236 if content_type == 'application/octet-stream':
237 # Do not read all response in memory when
238 # downloading an image.
239 body_iter = _close_after_stream(resp, CHUNKSIZE)
240 self.log_http_response(resp)
243 self.log_http_response(resp, content)
244 if content_type and content_type.startswith('application/json'):
245 # Let's use requests json method,
246 # it should take care of response
249 body_iter = resp.json()
250 except decoder.JSONDecodeError:
251 status_body = {'status_code': resp.status_code}
252 return resp, status_body
254 body_iter = six.StringIO(content)
256 body_iter = json.loads(''.join([c for c in body_iter]))
259 return resp, body_iter
261 def head(self, url, **kwargs):
262 return self._request('HEAD', url, **kwargs)
264 def get(self, url, **kwargs):
265 return self._request('GET', url, **kwargs)
267 def post(self, url, **kwargs):
268 return self._request('POST', url, **kwargs)
270 def put(self, url, **kwargs):
271 return self._request('PUT', url, **kwargs)
273 def patch(self, url, **kwargs):
274 return self._request('PATCH', url, **kwargs)
276 def delete(self, url, **kwargs):
277 return self._request('DELETE', url, **kwargs)
280 def _close_after_stream(response, chunk_size):
281 """Iterate over the content and ensure the response is closed after."""
282 # Yield each chunk in the response body
283 for chunk in response.iter_content(chunk_size=chunk_size):
285 # Once we're done streaming the body, ensure everything is closed.
286 # This will return the connection to the HTTPConnectionPool in urllib3
287 # and ideally reduce the number of HTTPConnectionPool full warnings.