2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
20 """TZlibTransport provides a compressed transport and transport factory
21 class, using the python standard library zlib module to implement
25 from __future__ import division
27 from cStringIO import StringIO
28 from TTransport import TTransportBase, CReadableTransport
31 class TZlibTransportFactory(object):
32 """Factory transport that builds zlib compressed transports.
34 This factory caches the last single client/transport that it was passed
35 and returns the same TZlibTransport object that was created.
37 This caching means the TServer class will get the _same_ transport
38 object for both input and output transports from this factory.
39 (For non-threaded scenarios only, since the cache only holds one object)
41 The purpose of this caching is to allocate only one TZlibTransport where
42 only one is really needed (since it must have separate read/write buffers),
43 and makes the statistics from getCompSavings() and getCompRatio()
46 # class scoped cache of last transport given and zlibtransport returned
50 def getTransport(self, trans, compresslevel=9):
51 """Wrap a transport, trans, with the TZlibTransport
52 compressed transport class, returning a new
53 transport to the caller.
55 @param compresslevel: The zlib compression level, ranging
56 from 0 (no compression) to 9 (best compression). Defaults to 9.
57 @type compresslevel: int
59 This method returns a TZlibTransport which wraps the
60 passed C{trans} TTransport derived instance.
62 if trans == self._last_trans:
64 ztrans = TZlibTransport(trans, compresslevel)
65 self._last_trans = trans
70 class TZlibTransport(TTransportBase, CReadableTransport):
71 """Class that wraps a transport with zlib, compressing writes
72 and decompresses reads, using the python standard
75 # Read buffer size for the python fastbinary C extension,
76 # the TBinaryProtocolAccelerated class.
77 DEFAULT_BUFFSIZE = 4096
79 def __init__(self, trans, compresslevel=9):
80 """Create a new TZlibTransport, wrapping C{trans}, another
81 TTransport derived object.
83 @param trans: A thrift transport object, i.e. a TSocket() object.
84 @type trans: TTransport
85 @param compresslevel: The zlib compression level, ranging
86 from 0 (no compression) to 9 (best compression). Default is 9.
87 @type compresslevel: int
90 self.compresslevel = compresslevel
91 self.__rbuf = StringIO()
92 self.__wbuf = StringIO()
96 def _reinit_buffers(self):
97 """Internal method to initialize/reset the internal StringIO objects
98 for read and write buffers.
100 self.__rbuf = StringIO()
101 self.__wbuf = StringIO()
103 def _init_stats(self):
104 """Internal method to reset the internal statistics counters
105 for compression ratios and bandwidth savings.
109 self.bytes_in_comp = 0
110 self.bytes_out_comp = 0
112 def _init_zlib(self):
113 """Internal method for setting up the zlib compression and
114 decompression objects.
116 self._zcomp_read = zlib.decompressobj()
117 self._zcomp_write = zlib.compressobj(self.compresslevel)
119 def getCompRatio(self):
120 """Get the current measured compression ratios (in,out) from
124 (inbound_compression_ratio, outbound_compression_ratio)
126 The compression ratios are computed as:
127 compressed / uncompressed
129 E.g., data that compresses by 10x will have a ratio of: 0.10
130 and data that compresses to half of ts original size will
133 None is returned if no bytes have yet been processed in
134 a particular direction.
136 r_percent, w_percent = (None, None)
137 if self.bytes_in > 0:
138 r_percent = self.bytes_in_comp / self.bytes_in
139 if self.bytes_out > 0:
140 w_percent = self.bytes_out_comp / self.bytes_out
141 return (r_percent, w_percent)
143 def getCompSavings(self):
144 """Get the current count of saved bytes due to data
148 (inbound_saved_bytes, outbound_saved_bytes)
150 Note: if compression is actually expanding your
151 data (only likely with very tiny thrift objects), then
152 the values returned will be negative.
154 r_saved = self.bytes_in - self.bytes_in_comp
155 w_saved = self.bytes_out - self.bytes_out_comp
156 return (r_saved, w_saved)
159 """Return the underlying transport's open status"""
160 return self.__trans.isOpen()
163 """Open the underlying transport"""
165 return self.__trans.open()
168 """Invoke the underlying transport's listen() method"""
169 self.__trans.listen()
172 """Accept connections on the underlying transport"""
173 return self.__trans.accept()
176 """Close the underlying transport,"""
177 self._reinit_buffers()
179 return self.__trans.close()
182 """Read up to sz bytes from the decompressed bytes buffer, and
183 read from the underlying transport if the decompression
186 ret = self.__rbuf.read(sz)
189 # keep reading from transport until something comes back
191 if self.readComp(sz):
193 ret = self.__rbuf.read(sz)
196 def readComp(self, sz):
197 """Read compressed data from the underlying transport, then
198 decompress it and append it to the internal StringIO read buffer
200 zbuf = self.__trans.read(sz)
201 zbuf = self._zcomp_read.unconsumed_tail + zbuf
202 buf = self._zcomp_read.decompress(zbuf)
203 self.bytes_in += len(zbuf)
204 self.bytes_in_comp += len(buf)
205 old = self.__rbuf.read()
206 self.__rbuf = StringIO(old + buf)
207 if len(old) + len(buf) == 0:
211 def write(self, buf):
212 """Write some bytes, putting them into the internal write
213 buffer for eventual compression.
215 self.__wbuf.write(buf)
218 """Flush any queued up data in the write buffer and ensure the
219 compression buffer is flushed out to the underlying transport
221 wout = self.__wbuf.getvalue()
223 zbuf = self._zcomp_write.compress(wout)
224 self.bytes_out += len(wout)
225 self.bytes_out_comp += len(zbuf)
228 ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH)
229 self.bytes_out_comp += len(ztail)
230 if (len(zbuf) + len(ztail)) > 0:
231 self.__wbuf = StringIO()
232 self.__trans.write(zbuf + ztail)
236 def cstringio_buf(self):
237 """Implement the CReadableTransport interface"""
240 def cstringio_refill(self, partialread, reqlen):
241 """Implement the CReadableTransport interface for refill"""
242 retstring = partialread
243 if reqlen < self.DEFAULT_BUFFSIZE:
244 retstring += self.read(self.DEFAULT_BUFFSIZE)
245 while len(retstring) < reqlen:
246 retstring += self.read(reqlen - len(retstring))
247 self.__rbuf = StringIO(retstring)