2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
19 """Implementation of non-blocking server.
21 The main idea of the server is to receive and send requests
22 only from the main thread.
24 The thread poool should be sized for concurrent tasks, not
34 logger = logging.getLogger(__name__)
36 from thrift.transport import TTransport
37 from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
39 __all__ = ['TNonblockingServer']
42 class Worker(threading.Thread):
43 """Worker is a small helper to process incoming connection."""
45 def __init__(self, queue):
46 threading.Thread.__init__(self)
50 """Process queries from task queue, stop if processor is None."""
53 processor, iprot, oprot, otrans, callback = self.queue.get()
56 processor.process(iprot, oprot)
57 callback(True, otrans.getvalue())
59 logger.exception("Exception while processing request")
70 """Decorator which locks self.lock."""
71 def nested(self, *args, **kwargs):
74 return func(self, *args, **kwargs)
80 def socket_exception(func):
81 """Decorator close object on socket.error."""
82 def read(self, *args, **kwargs):
84 return func(self, *args, **kwargs)
91 """Basic class is represented connection.
94 WAIT_LEN --- connection is reading request len.
95 WAIT_MESSAGE --- connection is reading request.
96 WAIT_PROCESS --- connection has just read whole request and
97 waits for call ready routine.
98 SEND_ANSWER --- connection is sending answer string (including length
100 CLOSED --- socket was closed and connection should be deleted.
102 def __init__(self, new_socket, wake_up):
103 self.socket = new_socket
104 self.socket.setblocking(False)
105 self.status = WAIT_LEN
108 self.lock = threading.Lock()
109 self.wake_up = wake_up
112 """Reads length of request.
114 It's a safer alternative to self.socket.recv(4)
116 read = self.socket.recv(4 - len(self.message))
118 # if we read 0 bytes and self.message is empty, then
119 # the client closed the connection
120 if len(self.message) != 0:
121 logger.error("can't read frame size from socket")
125 if len(self.message) == 4:
126 self.len, = struct.unpack('!i', self.message)
128 logger.error("negative frame size, it seems client "
129 "doesn't use FramedTransport")
132 logger.error("empty frame, it's really strange")
136 self.status = WAIT_MESSAGE
140 """Reads data from stream and switch state."""
141 assert self.status in (WAIT_LEN, WAIT_MESSAGE)
142 if self.status == WAIT_LEN:
144 # go back to the main loop here for simplicity instead of
145 # falling through, even though there is a good chance that
146 # the message is already available
147 elif self.status == WAIT_MESSAGE:
148 read = self.socket.recv(self.len - len(self.message))
150 logger.error("can't read frame from socket (get %d of "
151 "%d bytes)" % (len(self.message), self.len))
155 if len(self.message) == self.len:
156 self.status = WAIT_PROCESS
160 """Writes data from socket and switch state."""
161 assert self.status == SEND_ANSWER
162 sent = self.socket.send(self.message)
163 if sent == len(self.message):
164 self.status = WAIT_LEN
168 self.message = self.message[sent:]
171 def ready(self, all_ok, message):
172 """Callback function for switching state and waking up main thread.
174 This function is the only function witch can be called asynchronous.
176 The ready can switch Connection to three states:
177 WAIT_LEN if request was oneway.
178 SEND_ANSWER if request was processed in normal way.
179 CLOSED if request throws unexpected exception.
181 The one wakes up main thread.
183 assert self.status == WAIT_PROCESS
189 if len(message) == 0:
190 # it was a oneway request, do not write answer
192 self.status = WAIT_LEN
194 self.message = struct.pack('!i', len(message)) + message
195 self.status = SEND_ANSWER
199 def is_writeable(self):
200 """Return True if connection should be added to write list of select"""
201 return self.status == SEND_ANSWER
203 # it's not necessary, but...
205 def is_readable(self):
206 """Return True if connection should be added to read list of select"""
207 return self.status in (WAIT_LEN, WAIT_MESSAGE)
211 """Returns True if connection is closed."""
212 return self.status == CLOSED
215 """Returns the file descriptor of the associated socket."""
216 return self.socket.fileno()
219 """Closes connection"""
224 class TNonblockingServer:
225 """Non-blocking server."""
230 inputProtocolFactory=None,
231 outputProtocolFactory=None,
233 self.processor = processor
234 self.socket = lsocket
235 self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
236 self.out_protocol = outputProtocolFactory or self.in_protocol
237 self.threads = int(threads)
239 self.tasks = Queue.Queue()
240 self._read, self._write = socket.socketpair()
241 self.prepared = False
244 def setNumThreads(self, num):
245 """Set the number of worker threads that should be created."""
246 # implement ThreadPool interface
247 assert not self.prepared, "Can't change number of threads after start"
251 """Prepares server for serve requests."""
255 for _ in xrange(self.threads):
256 thread = Worker(self.tasks)
257 thread.setDaemon(True)
262 """Wake up main thread.
264 The server usually waits in select call in we should terminate one.
265 The simplest way is using socketpair.
267 Select always wait to read from the first socket of socketpair.
269 In this case, we can just write anything to the second socket from
272 self._write.send('1')
277 This method causes the serve() method to return. stop() may be invoked
278 from within your handler, or from another thread.
280 After stop() is called, serve() will return but the server will still
281 be listening on the socket. serve() may then be called again to resume
282 processing requests. Alternatively, close() may be called after
283 serve() returns to close the server socket and shutdown all worker
290 """Does select on open connections."""
291 readable = [self.socket.handle.fileno(), self._read.fileno()]
293 for i, connection in self.clients.items():
294 if connection.is_readable():
295 readable.append(connection.fileno())
296 if connection.is_writeable():
297 writable.append(connection.fileno())
298 if connection.is_closed():
300 return select.select(readable, writable, readable)
305 WARNING! You must call prepare() BEFORE calling handle()
307 assert self.prepared, "You have to call prepare before handle"
308 rset, wset, xset = self._select()
309 for readable in rset:
310 if readable == self._read.fileno():
311 # don't care i just need to clean readable flag
312 self._read.recv(1024)
313 elif readable == self.socket.handle.fileno():
314 client = self.socket.accept().handle
315 self.clients[client.fileno()] = Connection(client,
318 connection = self.clients[readable]
320 if connection.status == WAIT_PROCESS:
321 itransport = TTransport.TMemoryBuffer(connection.message)
322 otransport = TTransport.TMemoryBuffer()
323 iprot = self.in_protocol.getProtocol(itransport)
324 oprot = self.out_protocol.getProtocol(otransport)
325 self.tasks.put([self.processor, iprot, oprot,
326 otransport, connection.ready])
327 for writeable in wset:
328 self.clients[writeable].write()
330 self.clients[oob].close()
331 del self.clients[oob]
334 """Closes the server."""
335 for _ in xrange(self.threads):
336 self.tasks.put([None, None, None, None, None])
338 self.prepared = False
343 Serve requests forever, or until stop() is called.
347 while not self._stop: