upload http
[bottlenecks.git] / rubbos / app / httpd-2.0.64 / srclib / apr-util / misc / apr_queue.c
1 /* Copyright 2000-2005 The Apache Software Foundation or its licensors, as
2  * applicable.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "apr.h"
18
19 #if APR_HAVE_STDIO_H
20 #include <stdio.h>
21 #endif
22 #if APR_HAVE_STDLIB_H
23 #include <stdlib.h>
24 #endif
25 #if APR_HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif
28
29 #include "apu.h"
30 #include "apr_portable.h"
31 #include "apr_thread_mutex.h"
32 #include "apr_thread_cond.h"
33 #include "apr_errno.h"
34 #include "apr_queue.h"
35
36 #if APR_HAS_THREADS
37 /* 
38  * define this to get debug messages
39  *
40 #define QUEUE_DEBUG
41  */
42
43 struct apr_queue_t {
44     void              **data;
45     unsigned int        nelts; /**< # elements */
46     unsigned int        in;    /**< next empty location */
47     unsigned int        out;   /**< next filled location */
48     unsigned int        bounds;/**< max size of queue */
49     unsigned int        full_waiters;
50     unsigned int        empty_waiters;
51     apr_thread_mutex_t *one_big_mutex;
52     apr_thread_cond_t  *not_empty;
53     apr_thread_cond_t  *not_full;
54     int                 terminated;
55 };
56
57 #ifdef QUEUE_DEBUG
58 static void Q_DBG(char*msg, apr_queue_t *q) {
59     fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n", 
60                     apr_os_thread_current(),
61                     q->nelts, q->in, q->out,
62                     msg
63                     );
64 }
65 #else
66 #define Q_DBG(x,y) 
67 #endif
68
69 /**
70  * Detects when the apr_queue_t is full. This utility function is expected
71  * to be called from within critical sections, and is not threadsafe.
72  */
73 #define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds)
74
75 /**
76  * Detects when the apr_queue_t is empty. This utility function is expected
77  * to be called from within critical sections, and is not threadsafe.
78  */
79 #define apr_queue_empty(queue) ((queue)->nelts == 0)
80
81 /**
82  * Callback routine that is called to destroy this
83  * apr_queue_t when its pool is destroyed.
84  */
85 static apr_status_t queue_destroy(void *data) 
86 {
87     apr_queue_t *queue = data;
88
89     /* Ignore errors here, we can't do anything about them anyway. */
90
91     apr_thread_cond_destroy(queue->not_empty);
92     apr_thread_cond_destroy(queue->not_full);
93     apr_thread_mutex_destroy(queue->one_big_mutex);
94
95     return APR_SUCCESS;
96 }
97
98 /**
99  * Initialize the apr_queue_t.
100  */
101 APU_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q, 
102                                            unsigned int queue_capacity, 
103                                            apr_pool_t *a)
104 {
105     apr_status_t rv;
106     apr_queue_t *queue;
107     queue = apr_palloc(a, sizeof(apr_queue_t));
108     *q = queue;
109
110     /* nested doesn't work ;( */
111     rv = apr_thread_mutex_create(&queue->one_big_mutex,
112                                  APR_THREAD_MUTEX_UNNESTED,
113                                  a);
114     if (rv != APR_SUCCESS) {
115         return rv;
116     }
117
118     rv = apr_thread_cond_create(&queue->not_empty, a);
119     if (rv != APR_SUCCESS) {
120         return rv;
121     }
122
123     rv = apr_thread_cond_create(&queue->not_full, a);
124     if (rv != APR_SUCCESS) {
125         return rv;
126     }
127
128     /* Set all the data in the queue to NULL */
129     queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*));
130     queue->bounds = queue_capacity;
131     queue->nelts = 0;
132     queue->in = 0;
133     queue->out = 0;
134     queue->terminated = 0;
135     queue->full_waiters = 0;
136     queue->empty_waiters = 0;
137
138     apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null);
139
140     return APR_SUCCESS;
141 }
142
143 /**
144  * Push new data onto the queue. Blocks if the queue is full. Once
145  * the push operation has completed, it signals other threads waiting
146  * in apr_queue_pop() that they may continue consuming sockets.
147  */
148 APU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
149 {
150     apr_status_t rv;
151
152     if (queue->terminated) {
153         return APR_EOF; /* no more elements ever again */
154     }
155
156     rv = apr_thread_mutex_lock(queue->one_big_mutex);
157     if (rv != APR_SUCCESS) {
158         return rv;
159     }
160
161     if (apr_queue_full(queue)) {
162         if (!queue->terminated) {
163             queue->full_waiters++;
164             rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
165             queue->full_waiters--;
166             if (rv != APR_SUCCESS) {
167                 apr_thread_mutex_unlock(queue->one_big_mutex);
168                 return rv;
169             }
170         }
171         /* If we wake up and it's still empty, then we were interrupted */
172         if (apr_queue_full(queue)) {
173             Q_DBG("queue full (intr)", queue);
174             rv = apr_thread_mutex_unlock(queue->one_big_mutex);
175             if (rv != APR_SUCCESS) {
176                 return rv;
177             }
178             if (queue->terminated) {
179                 return APR_EOF; /* no more elements ever again */
180             }
181             else {
182                 return APR_EINTR;
183             }
184         }
185     }
186
187     queue->data[queue->in] = data;
188     queue->in = (queue->in + 1) % queue->bounds;
189     queue->nelts++;
190
191     if (queue->empty_waiters) {
192         Q_DBG("sig !empty", queue);
193         rv = apr_thread_cond_signal(queue->not_empty);
194         if (rv != APR_SUCCESS) {
195             apr_thread_mutex_unlock(queue->one_big_mutex);
196             return rv;
197         }
198     }
199
200     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
201     return rv;
202 }
203
204 /**
205  * Push new data onto the queue. Blocks if the queue is full. Once
206  * the push operation has completed, it signals other threads waiting
207  * in apr_queue_pop() that they may continue consuming sockets.
208  */
209 APU_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
210 {
211     apr_status_t rv;
212
213     if (queue->terminated) {
214         return APR_EOF; /* no more elements ever again */
215     }
216
217     rv = apr_thread_mutex_lock(queue->one_big_mutex);
218     if (rv != APR_SUCCESS) {
219         return rv;
220     }
221
222     if (apr_queue_full(queue)) {
223         rv = apr_thread_mutex_unlock(queue->one_big_mutex);
224         return APR_EAGAIN;
225     }
226     
227     queue->data[queue->in] = data;
228     queue->in = (queue->in + 1) % queue->bounds;
229     queue->nelts++;
230
231     if (queue->empty_waiters) {
232         Q_DBG("sig !empty", queue);
233         rv  = apr_thread_cond_signal(queue->not_empty);
234         if (rv != APR_SUCCESS) {
235             apr_thread_mutex_unlock(queue->one_big_mutex);
236             return rv;
237         }
238     }
239
240     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
241     return rv;
242 }
243
244 /**
245  * not thread safe
246  */
247 APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) {
248     return queue->nelts;
249 }
250
251 /**
252  * Retrieves the next item from the queue. If there are no
253  * items available, it will block until one becomes available.
254  * Once retrieved, the item is placed into the address specified by
255  * 'data'.
256  */
257 APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
258 {
259     apr_status_t rv;
260
261     if (queue->terminated) {
262         return APR_EOF; /* no more elements ever again */
263     }
264
265     rv = apr_thread_mutex_lock(queue->one_big_mutex);
266     if (rv != APR_SUCCESS) {
267         return rv;
268     }
269
270     /* Keep waiting until we wake up and find that the queue is not empty. */
271     if (apr_queue_empty(queue)) {
272         if (!queue->terminated) {
273             queue->empty_waiters++;
274             rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
275             queue->empty_waiters--;
276             if (rv != APR_SUCCESS) {
277                 apr_thread_mutex_unlock(queue->one_big_mutex);
278                 return rv;
279             }
280         }
281         /* If we wake up and it's still empty, then we were interrupted */
282         if (apr_queue_empty(queue)) {
283             Q_DBG("queue empty (intr)", queue);
284             rv = apr_thread_mutex_unlock(queue->one_big_mutex);
285             if (rv != APR_SUCCESS) {
286                 return rv;
287             }
288             if (queue->terminated) {
289                 return APR_EOF; /* no more elements ever again */
290             }
291             else {
292                 return APR_EINTR;
293             }
294         }
295     } 
296
297     *data = queue->data[queue->out];
298     queue->nelts--;
299
300     queue->out = (queue->out + 1) % queue->bounds;
301     if (queue->full_waiters) {
302         Q_DBG("signal !full", queue);
303         rv = apr_thread_cond_signal(queue->not_full);
304         if (rv != APR_SUCCESS) {
305             apr_thread_mutex_unlock(queue->one_big_mutex);
306             return rv;
307         }
308     }
309
310     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
311     return rv;
312 }
313
314 /**
315  * Retrieves the next item from the queue. If there are no
316  * items available, it will block until one becomes available.
317  * Once retrieved, the item is placed into the address specified by
318  * 'data'.
319  */
320 APU_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
321 {
322     apr_status_t rv;
323
324     if (queue->terminated) {
325         return APR_EOF; /* no more elements ever again */
326     }
327
328     rv = apr_thread_mutex_lock(queue->one_big_mutex);
329     if (rv != APR_SUCCESS) {
330         return rv;
331     }
332
333     /* Keep waiting until we wake up and find that the queue is not empty. */
334     if (apr_queue_empty(queue)) {
335         rv = apr_thread_mutex_unlock(queue->one_big_mutex);
336         return APR_EAGAIN;
337     } 
338
339     *data = queue->data[queue->out];
340     queue->nelts--;
341
342     queue->out = (queue->out + 1) % queue->bounds;
343     if (queue->full_waiters) {
344         Q_DBG("signal !full", queue);
345         rv = apr_thread_cond_signal(queue->not_full);
346         if (rv != APR_SUCCESS) {
347             apr_thread_mutex_unlock(queue->one_big_mutex);
348             return rv;
349         }
350     }
351
352     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
353     return rv;
354 }
355
356 APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)
357 {
358     apr_status_t rv;
359     Q_DBG("intr all", queue);    
360     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
361         return rv;
362     }
363     apr_thread_cond_broadcast(queue->not_empty);
364     apr_thread_cond_broadcast(queue->not_full);
365
366     if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
367         return rv;
368     }
369
370     return APR_SUCCESS;
371 }
372
373 APU_DECLARE(apr_status_t) apr_queue_term(apr_queue_t *queue)
374 {
375     apr_status_t rv;
376
377     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
378         return rv;
379     }
380
381     /* we must hold one_big_mutex when setting this... otherwise,
382      * we could end up setting it and waking everybody up just after a 
383      * would-be popper checks it but right before they block
384      */
385     queue->terminated = 1;
386     if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
387         return rv;
388     }
389     return apr_queue_interrupt_all(queue);
390 }
391
392 #endif /* APR_HAS_THREADS */