added sqlite3 support at Domino Server for label subscriptions and client registrations
[domino.git] / lib / thrift / transport / TZlibTransport.py
1 #
2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
9 #
10 #   http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
17 # under the License.
18 #
19
20 """TZlibTransport provides a compressed transport and transport factory
21 class, using the python standard library zlib module to implement
22 data compression.
23 """
24
25 from __future__ import division
26 import zlib
27 from cStringIO import StringIO
28 from TTransport import TTransportBase, CReadableTransport
29
30
31 class TZlibTransportFactory(object):
32   """Factory transport that builds zlib compressed transports.
33
34   This factory caches the last single client/transport that it was passed
35   and returns the same TZlibTransport object that was created.
36
37   This caching means the TServer class will get the _same_ transport
38   object for both input and output transports from this factory.
39   (For non-threaded scenarios only, since the cache only holds one object)
40
41   The purpose of this caching is to allocate only one TZlibTransport where
42   only one is really needed (since it must have separate read/write buffers),
43   and makes the statistics from getCompSavings() and getCompRatio()
44   easier to understand.
45   """
46   # class scoped cache of last transport given and zlibtransport returned
47   _last_trans = None
48   _last_z = None
49
50   def getTransport(self, trans, compresslevel=9):
51     """Wrap a transport, trans, with the TZlibTransport
52     compressed transport class, returning a new
53     transport to the caller.
54
55     @param compresslevel: The zlib compression level, ranging
56     from 0 (no compression) to 9 (best compression).  Defaults to 9.
57     @type compresslevel: int
58
59     This method returns a TZlibTransport which wraps the
60     passed C{trans} TTransport derived instance.
61     """
62     if trans == self._last_trans:
63       return self._last_z
64     ztrans = TZlibTransport(trans, compresslevel)
65     self._last_trans = trans
66     self._last_z = ztrans
67     return ztrans
68
69
70 class TZlibTransport(TTransportBase, CReadableTransport):
71   """Class that wraps a transport with zlib, compressing writes
72   and decompresses reads, using the python standard
73   library zlib module.
74   """
75   # Read buffer size for the python fastbinary C extension,
76   # the TBinaryProtocolAccelerated class.
77   DEFAULT_BUFFSIZE = 4096
78
79   def __init__(self, trans, compresslevel=9):
80     """Create a new TZlibTransport, wrapping C{trans}, another
81     TTransport derived object.
82
83     @param trans: A thrift transport object, i.e. a TSocket() object.
84     @type trans: TTransport
85     @param compresslevel: The zlib compression level, ranging
86     from 0 (no compression) to 9 (best compression).  Default is 9.
87     @type compresslevel: int
88     """
89     self.__trans = trans
90     self.compresslevel = compresslevel
91     self.__rbuf = StringIO()
92     self.__wbuf = StringIO()
93     self._init_zlib()
94     self._init_stats()
95
96   def _reinit_buffers(self):
97     """Internal method to initialize/reset the internal StringIO objects
98     for read and write buffers.
99     """
100     self.__rbuf = StringIO()
101     self.__wbuf = StringIO()
102
103   def _init_stats(self):
104     """Internal method to reset the internal statistics counters
105     for compression ratios and bandwidth savings.
106     """
107     self.bytes_in = 0
108     self.bytes_out = 0
109     self.bytes_in_comp = 0
110     self.bytes_out_comp = 0
111
112   def _init_zlib(self):
113     """Internal method for setting up the zlib compression and
114     decompression objects.
115     """
116     self._zcomp_read = zlib.decompressobj()
117     self._zcomp_write = zlib.compressobj(self.compresslevel)
118
119   def getCompRatio(self):
120     """Get the current measured compression ratios (in,out) from
121     this transport.
122
123     Returns a tuple of:
124     (inbound_compression_ratio, outbound_compression_ratio)
125
126     The compression ratios are computed as:
127         compressed / uncompressed
128
129     E.g., data that compresses by 10x will have a ratio of: 0.10
130     and data that compresses to half of ts original size will
131     have a ratio of 0.5
132
133     None is returned if no bytes have yet been processed in
134     a particular direction.
135     """
136     r_percent, w_percent = (None, None)
137     if self.bytes_in > 0:
138       r_percent = self.bytes_in_comp / self.bytes_in
139     if self.bytes_out > 0:
140       w_percent = self.bytes_out_comp / self.bytes_out
141     return (r_percent, w_percent)
142
143   def getCompSavings(self):
144     """Get the current count of saved bytes due to data
145     compression.
146
147     Returns a tuple of:
148     (inbound_saved_bytes, outbound_saved_bytes)
149
150     Note: if compression is actually expanding your
151     data (only likely with very tiny thrift objects), then
152     the values returned will be negative.
153     """
154     r_saved = self.bytes_in - self.bytes_in_comp
155     w_saved = self.bytes_out - self.bytes_out_comp
156     return (r_saved, w_saved)
157
158   def isOpen(self):
159     """Return the underlying transport's open status"""
160     return self.__trans.isOpen()
161
162   def open(self):
163     """Open the underlying transport"""
164     self._init_stats()
165     return self.__trans.open()
166
167   def listen(self):
168     """Invoke the underlying transport's listen() method"""
169     self.__trans.listen()
170
171   def accept(self):
172     """Accept connections on the underlying transport"""
173     return self.__trans.accept()
174
175   def close(self):
176     """Close the underlying transport,"""
177     self._reinit_buffers()
178     self._init_zlib()
179     return self.__trans.close()
180
181   def read(self, sz):
182     """Read up to sz bytes from the decompressed bytes buffer, and
183     read from the underlying transport if the decompression
184     buffer is empty.
185     """
186     ret = self.__rbuf.read(sz)
187     if len(ret) > 0:
188       return ret
189     # keep reading from transport until something comes back
190     while True:
191       if self.readComp(sz):
192         break
193     ret = self.__rbuf.read(sz)
194     return ret
195
196   def readComp(self, sz):
197     """Read compressed data from the underlying transport, then
198     decompress it and append it to the internal StringIO read buffer
199     """
200     zbuf = self.__trans.read(sz)
201     zbuf = self._zcomp_read.unconsumed_tail + zbuf
202     buf = self._zcomp_read.decompress(zbuf)
203     self.bytes_in += len(zbuf)
204     self.bytes_in_comp += len(buf)
205     old = self.__rbuf.read()
206     self.__rbuf = StringIO(old + buf)
207     if len(old) + len(buf) == 0:
208       return False
209     return True
210
211   def write(self, buf):
212     """Write some bytes, putting them into the internal write
213     buffer for eventual compression.
214     """
215     self.__wbuf.write(buf)
216
217   def flush(self):
218     """Flush any queued up data in the write buffer and ensure the
219     compression buffer is flushed out to the underlying transport
220     """
221     wout = self.__wbuf.getvalue()
222     if len(wout) > 0:
223       zbuf = self._zcomp_write.compress(wout)
224       self.bytes_out += len(wout)
225       self.bytes_out_comp += len(zbuf)
226     else:
227       zbuf = ''
228     ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH)
229     self.bytes_out_comp += len(ztail)
230     if (len(zbuf) + len(ztail)) > 0:
231       self.__wbuf = StringIO()
232       self.__trans.write(zbuf + ztail)
233     self.__trans.flush()
234
235   @property
236   def cstringio_buf(self):
237     """Implement the CReadableTransport interface"""
238     return self.__rbuf
239
240   def cstringio_refill(self, partialread, reqlen):
241     """Implement the CReadableTransport interface for refill"""
242     retstring = partialread
243     if reqlen < self.DEFAULT_BUFFSIZE:
244       retstring += self.read(self.DEFAULT_BUFFSIZE)
245     while len(retstring) < reqlen:
246       retstring += self.read(reqlen - len(retstring))
247     self.__rbuf = StringIO(retstring)
248     return self.__rbuf