bottleneck testcase based on rubbos
[bottlenecks.git] / rubbos / app / tomcat-connectors-1.2.32-src / native / common / jk_lb_worker.c
1 /*
2  *  Licensed to the Apache Software Foundation (ASF) under one or more
3  *  contributor license agreements.  See the NOTICE file distributed with
4  *  this work for additional information regarding copyright ownership.
5  *  The ASF licenses this file to You under the Apache License, Version 2.0
6  *  (the "License"); you may not use this file except in compliance with
7  *  the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 /***************************************************************************
19  * Description: Load balancer worker, knows how to load balance among      *
20  *              several workers.                                           *
21  * Author:      Gal Shachor <shachor@il.ibm.com>                           *
22  * Author:      Mladen Turk <mturk@apache.org>                             *
23  * Author:      Rainer Jung <rjung@apache.org>                             *
24  * Based on:                                                               *
25  * Version:     $Revision: 1137200 $                                          *
26  ***************************************************************************/
27
28 #include "jk_pool.h"
29 #include "jk_service.h"
30 #include "jk_util.h"
31 #include "jk_worker.h"
32 #include "jk_lb_worker.h"
33 #include "jk_ajp13.h"
34 #include "jk_ajp13_worker.h"
35 #include "jk_ajp14_worker.h"
36 #include "jk_mt.h"
37 #include "jk_shm.h"
38
39 /*
40  * The load balancing code in this
41  */
42
43 /*
44  * The following two macros need to be kept in sync with
45  * the existing values for state and activation.
46  * Note: state <= JK_LB_STATE_FORCE is equivalent to
47  *       state is none of JK_LB_STATE_BUSY, JK_LB_STATE_ERROR, JK_LB_STATE_PROBE
48  * Note: state <= JK_LB_STATE_BUSY is equivalent to
49  *       state is none of JK_LB_STATE_ERROR, JK_LB_STATE_PROBE
50  * Note: activation == JK_LB_ACTIVATION_ACTIVE is equivalent to
51  *       activation is none of JK_LB_ACTIVATION_STOPPED, JK_LB_ACTIVATION_DISABLED
52  */
53 #define JK_WORKER_USABLE(s, activation)   ((s) <= JK_LB_STATE_FORCE && activation == JK_LB_ACTIVATION_ACTIVE)
54 #define JK_WORKER_USABLE_STICKY(s, activation)   ((s) <= JK_LB_STATE_BUSY && activation != JK_LB_ACTIVATION_STOPPED)
55
56 static const char *lb_locking_type[] = {
57     JK_LB_LOCK_TEXT_OPTIMISTIC,
58     JK_LB_LOCK_TEXT_PESSIMISTIC,
59     "unknown",
60     NULL
61 };
62
63 static const char *lb_method_type[] = {
64     JK_LB_METHOD_TEXT_REQUESTS,
65     JK_LB_METHOD_TEXT_TRAFFIC,
66     JK_LB_METHOD_TEXT_BUSYNESS,
67     JK_LB_METHOD_TEXT_SESSIONS,
68     "unknown",
69     NULL
70 };
71
72 static const char *lb_state_type[] = {
73     JK_LB_STATE_TEXT_IDLE,
74     JK_LB_STATE_TEXT_OK,
75     JK_LB_STATE_TEXT_RECOVER,
76     JK_LB_STATE_TEXT_FORCE,
77     JK_LB_STATE_TEXT_BUSY,
78     JK_LB_STATE_TEXT_ERROR,
79     JK_LB_STATE_TEXT_PROBE,
80     "unknown",
81     NULL
82 };
83
84 static const char *lb_activation_type[] = {
85     JK_LB_ACTIVATION_TEXT_ACTIVE,
86     JK_LB_ACTIVATION_TEXT_DISABLED,
87     JK_LB_ACTIVATION_TEXT_STOPPED,
88     "unknown",
89     NULL
90 };
91
92 static const char *lb_first_log_names[] = {
93     JK_NOTE_LB_FIRST_NAME,
94     JK_NOTE_LB_FIRST_VALUE,
95     JK_NOTE_LB_FIRST_ACCESSED,
96     JK_NOTE_LB_FIRST_READ,
97     JK_NOTE_LB_FIRST_TRANSFERRED,
98     JK_NOTE_LB_FIRST_ERRORS,
99     JK_NOTE_LB_FIRST_BUSY,
100     JK_NOTE_LB_FIRST_ACTIVATION,
101     JK_NOTE_LB_FIRST_STATE,
102     NULL
103 };
104
105 static const char *lb_last_log_names[] = {
106     JK_NOTE_LB_LAST_NAME,
107     JK_NOTE_LB_LAST_VALUE,
108     JK_NOTE_LB_LAST_ACCESSED,
109     JK_NOTE_LB_LAST_READ,
110     JK_NOTE_LB_LAST_TRANSFERRED,
111     JK_NOTE_LB_LAST_ERRORS,
112     JK_NOTE_LB_LAST_BUSY,
113     JK_NOTE_LB_LAST_ACTIVATION,
114     JK_NOTE_LB_LAST_STATE,
115     NULL
116 };
117
118 struct lb_endpoint
119 {
120     lb_worker_t     *worker;
121     jk_endpoint_t    endpoint;
122     int             *states;
123 };
124 typedef struct lb_endpoint lb_endpoint_t;
125
126
127 /* Calculate the greatest common divisor of two positive integers */
128 static jk_uint64_t gcd(jk_uint64_t a, jk_uint64_t b)
129 {
130     jk_uint64_t r;
131     if (b > a) {
132         r = a;
133         a = b;
134         b = r;
135     }
136     while (b > 0) {
137         r = a % b;
138         a = b;
139         b = r;
140     }
141     return a;
142 }
143
144 /* Calculate the smallest common multiple of two positive integers */
145 static jk_uint64_t scm(jk_uint64_t a, jk_uint64_t b)
146 {
147     return a * b / gcd(a, b);
148 }
149
150 /* Return the string representation of the lb lock type */
151 const char *jk_lb_get_lock(lb_worker_t *p, jk_logger_t *l)
152 {
153     return lb_locking_type[p->lblock];
154 }
155
156 /* Return the int representation of the lb lock type */
157 int jk_lb_get_lock_code(const char *v)
158 {
159     if (!v)
160         return JK_LB_LOCK_DEF;
161     else if  (*v == 'o' || *v == 'O' || *v == '0')
162         return JK_LB_LOCK_OPTIMISTIC;
163     else if  (*v == 'p' || *v == 'P' || *v == '1')
164         return JK_LB_LOCK_PESSIMISTIC;
165     else
166         return JK_LB_LOCK_DEF;
167 }
168
169 /* Return the string representation of the lb method type */
170 const char *jk_lb_get_method(lb_worker_t *p, jk_logger_t *l)
171 {
172     return lb_method_type[p->lbmethod];
173 }
174
175 /* Return the int representation of the lb method type */
176 int jk_lb_get_method_code(const char *v)
177 {
178     if (!v)
179         return JK_LB_METHOD_DEF;
180     else if  (*v == 'r' || *v == 'R' || *v == '0')
181         return JK_LB_METHOD_REQUESTS;
182     else if  (*v == 't' || *v == 'T' || *v == '1')
183         return JK_LB_METHOD_TRAFFIC;
184     else if  (*v == 'b' || *v == 'B' || *v == '2')
185         return JK_LB_METHOD_BUSYNESS;
186     else if  (*v == 's' || *v == 'S' || *v == '3')
187         return JK_LB_METHOD_SESSIONS;
188     else
189         return JK_LB_METHOD_DEF;
190 }
191
192 /* Return the string representation of the balance worker state */
193 const char *jk_lb_get_state(lb_sub_worker_t *p, jk_logger_t *l)
194 {
195     return lb_state_type[p->s->state];
196 }
197
198 /* Return the int representation of the lb state */
199 int jk_lb_get_state_code(const char *v)
200 {
201     if (!v)
202         return JK_LB_STATE_DEF;
203     else if  (*v == 'i' || *v == 'I' || *v == 'n' || *v == 'N' || *v == '0')
204         return JK_LB_STATE_IDLE;
205     else if  (*v == 'o' || *v == 'O' || *v == '1')
206         return JK_LB_STATE_OK;
207     else if  (*v == 'r' || *v == 'R' || *v == '2')
208         return JK_LB_STATE_RECOVER;
209     else if  (*v == 'f' || *v == 'F' || *v == '3')
210         return JK_LB_STATE_FORCE;
211     else if  (*v == 'b' || *v == 'B' || *v == '4')
212         return JK_LB_STATE_BUSY;
213     else if  (*v == 'e' || *v == 'E' || *v == '5')
214         return JK_LB_STATE_ERROR;
215     else if  (*v == 'p' || *v == 'P' || *v == '6')
216         return JK_LB_STATE_PROBE;
217     else
218         return JK_LB_STATE_DEF;
219 }
220
221 /* Return the string representation of the balance worker activation */
222 /* based on the integer representation */
223 const char *jk_lb_get_activation_direct(int activation, jk_logger_t *l)
224 {
225     return lb_activation_type[activation];
226 }
227
228 /* Return the string representation of the balance worker activation */
229 /* based on the sub worker struct */
230 const char *jk_lb_get_activation(lb_sub_worker_t *p, jk_logger_t *l)
231 {
232     return lb_activation_type[p->activation];
233 }
234
235 int jk_lb_get_activation_code(const char *v)
236 {
237     if (!v)
238         return JK_LB_ACTIVATION_DEF;
239     else if (*v == 'a' || *v == 'A' || *v == '0')
240         return JK_LB_ACTIVATION_ACTIVE;
241     else if (*v == 'd' || *v == 'D' || *v == '1')
242         return JK_LB_ACTIVATION_DISABLED;
243     else if (*v == 's' || *v == 'S' || *v == '2')
244         return JK_LB_ACTIVATION_STOPPED;
245     else
246         return JK_LB_ACTIVATION_DEF;
247 }
248
249 /* Update the load multipliers wrt. lb_factor */
250 void update_mult(lb_worker_t *p, jk_logger_t *l)
251 {
252     unsigned int i = 0;
253     jk_uint64_t s = 1;
254     JK_TRACE_ENTER(l);
255     for (i = 0; i < p->num_of_workers; i++) {
256         s = scm(s, p->lb_workers[i].lb_factor);
257     }
258     for (i = 0; i < p->num_of_workers; i++) {
259         p->lb_workers[i].lb_mult = s / p->lb_workers[i].lb_factor;
260         if (JK_IS_DEBUG_LEVEL(l))
261             jk_log(l, JK_LOG_DEBUG,
262                    "worker %s gets multiplicity %"
263                    JK_UINT64_T_FMT,
264                    p->lb_workers[i].name,
265                    p->lb_workers[i].lb_mult);
266     }
267     JK_TRACE_EXIT(l);
268 }
269
270 /* Reset all lb values.
271  */
272 void reset_lb_values(lb_worker_t *p, jk_logger_t *l)
273 {
274     unsigned int i = 0;
275     JK_TRACE_ENTER(l);
276     if (p->lbmethod != JK_LB_METHOD_BUSYNESS) {
277         for (i = 0; i < p->num_of_workers; i++) {
278             p->lb_workers[i].s->lb_value = 0;
279         }
280     }
281     JK_TRACE_EXIT(l);
282 }
283
284 /* Syncing config values from shm */
285 void jk_lb_pull(lb_worker_t *p, int locked, jk_logger_t *l)
286 {
287     unsigned int i = 0;
288
289     JK_TRACE_ENTER(l);
290
291     if (JK_IS_DEBUG_LEVEL(l))
292         jk_log(l, JK_LOG_DEBUG,
293                "syncing mem for lb '%s' from shm (%u->%u)",
294                p->name, p->sequence, p->s->h.sequence);
295     if (locked == JK_FALSE)
296         jk_shm_lock();
297     p->sticky_session = p->s->sticky_session;
298     p->sticky_session_force = p->s->sticky_session_force;
299     p->recover_wait_time = p->s->recover_wait_time;
300     p->error_escalation_time = p->s->error_escalation_time;
301     p->max_reply_timeouts = p->s->max_reply_timeouts;
302     p->retries = p->s->retries;
303     p->retry_interval = p->s->retry_interval;
304     p->lbmethod = p->s->lbmethod;
305     p->lblock = p->s->lblock;
306     p->max_packet_size = p->s->max_packet_size;
307     p->sequence = p->s->h.sequence;
308     strncpy(p->session_cookie, p->s->session_cookie, JK_SHM_STR_SIZ);
309     strncpy(p->session_path, p->s->session_path, JK_SHM_STR_SIZ);
310
311     for (i = 0; i < p->num_of_workers; i++) {
312         lb_sub_worker_t *w = &p->lb_workers[i];
313         if (w->sequence != w->s->h.sequence) {
314             jk_worker_t *jw = w->worker;
315             ajp_worker_t *aw = (ajp_worker_t *)jw->worker_private;
316
317             if (JK_IS_DEBUG_LEVEL(l))
318                 jk_log(l, JK_LOG_DEBUG,
319                        "syncing mem for member '%s' of lb '%s' from shm",
320                        w->name, p->name);
321
322             jk_ajp_pull(aw, JK_TRUE, l);
323             strncpy(w->route, w->s->route, JK_SHM_STR_SIZ);
324             strncpy(w->domain, w->s->domain, JK_SHM_STR_SIZ);
325             strncpy(w->redirect, w->s->redirect, JK_SHM_STR_SIZ);
326             w->distance = w->s->distance;
327             w->activation = w->s->activation;
328             w->lb_factor = w->s->lb_factor;
329             w->lb_mult = w->s->lb_mult;
330             w->sequence = w->s->h.sequence;
331         }
332     }
333     if (locked == JK_FALSE)
334         jk_shm_unlock();
335
336     JK_TRACE_EXIT(l);
337 }
338
339 /* Syncing config values to shm */
340 void jk_lb_push(lb_worker_t *p, int locked, jk_logger_t *l)
341 {
342     unsigned int i = 0;
343
344     JK_TRACE_ENTER(l);
345
346     if (JK_IS_DEBUG_LEVEL(l))
347         jk_log(l, JK_LOG_DEBUG,
348                "syncing shm for lb '%s' from mem (%u->%u)",
349                p->name, p->s->h.sequence, p->sequence);
350     if (locked == JK_FALSE)
351         jk_shm_lock();
352     p->s->sticky_session = p->sticky_session;
353     p->s->sticky_session_force = p->sticky_session_force;
354     p->s->recover_wait_time = p->recover_wait_time;
355     p->s->error_escalation_time = p->error_escalation_time;
356     p->s->max_reply_timeouts = p->max_reply_timeouts;
357     p->s->retries = p->retries;
358     p->s->retry_interval = p->retry_interval;
359     p->s->lbmethod = p->lbmethod;
360     p->s->lblock = p->lblock;
361     p->s->max_packet_size = p->max_packet_size;
362     p->s->h.sequence = p->sequence;
363     strncpy(p->s->session_cookie, p->session_cookie, JK_SHM_STR_SIZ);
364     strncpy(p->s->session_path, p->session_path, JK_SHM_STR_SIZ);
365
366     for (i = 0; i < p->num_of_workers; i++) {
367         lb_sub_worker_t *w = &p->lb_workers[i];
368         if (w->sequence != w->s->h.sequence) {
369             jk_worker_t *jw = w->worker;
370             ajp_worker_t *aw = (ajp_worker_t *)jw->worker_private;
371
372             if (JK_IS_DEBUG_LEVEL(l))
373                 jk_log(l, JK_LOG_DEBUG,
374                        "syncing shm for member '%s' of lb '%s' from mem",
375                        w->name, p->name);
376
377             jk_ajp_push(aw, JK_TRUE, l);
378             strncpy(w->s->route, w->route, JK_SHM_STR_SIZ);
379             strncpy(w->s->domain, w->domain, JK_SHM_STR_SIZ);
380             strncpy(w->s->redirect, w->redirect, JK_SHM_STR_SIZ);
381             w->s->distance = w->distance;
382             w->s->activation = w->activation;
383             w->s->lb_factor = w->lb_factor;
384             w->s->lb_mult = w->lb_mult;
385             w->s->h.sequence = w->sequence;
386         }
387     }
388     if (locked == JK_FALSE)
389         jk_shm_unlock();
390
391     JK_TRACE_EXIT(l);
392 }
393
394 /* Retrieve the parameter with the given name                                */
395 static char *get_path_param(jk_ws_service_t *s, const char *name)
396 {
397     char *id_start = NULL;
398     for (id_start = strstr(s->req_uri, name);
399          id_start; id_start = strstr(id_start + 1, name)) {
400         if (id_start[strlen(name)] == '=') {
401             /*
402              * Session path-cookie was found, get it's value
403              */
404             id_start += (1 + strlen(name));
405             if (strlen(id_start)) {
406                 char *id_end;
407                 id_start = jk_pool_strdup(s->pool, id_start);
408                 /*
409                  * The query string is not part of req_uri, however
410                  * to be on the safe side lets remove the trailing query
411                  * string if appended...
412                  */
413                 if ((id_end = strchr(id_start, '?')) != NULL) {
414                     *id_end = '\0';
415                 }
416                 /*
417                  * Remove any trailing path element.
418                  */
419                 if ((id_end = strchr(id_start, ';')) != NULL) {
420                     *id_end = '\0';
421                 }
422                 return id_start;
423             }
424         }
425     }
426
427     return NULL;
428 }
429
430 /* Retrieve the cookie with the given name                                   */
431 static char *get_cookie(jk_ws_service_t *s, const char *name)
432 {
433     unsigned i;
434     char *result = NULL;
435
436     for (i = 0; i < s->num_headers; i++) {
437         if (strcasecmp(s->headers_names[i], "cookie") == 0) {
438
439             char *id_start;
440             for (id_start = strstr(s->headers_values[i], name);
441                  id_start; id_start = strstr(id_start + 1, name)) {
442                 if (id_start == s->headers_values[i] ||
443                     id_start[-1] == ';' ||
444                     id_start[-1] == ',' || isspace((int)id_start[-1])) {
445                     id_start += strlen(name);
446                     while (*id_start && isspace((int)(*id_start)))
447                         ++id_start;
448                     if (*id_start == '=' && id_start[1]) {
449                         /*
450                          * Session cookie was found, get it's value
451                          */
452                         char *id_end;
453                         size_t sz;
454                         ++id_start;
455                         if ((id_end = strpbrk(id_start, ";,")) != NULL)
456                             sz = id_end - id_start;
457                         else {
458                             sz = strlen(id_start);
459                             id_end = id_start + sz;
460                         }
461                         if (result == NULL) {
462                             result = jk_pool_alloc(s->pool, sz + 1);
463                             memcpy(result, id_start, sz);
464                             result[sz] = '\0';
465                         }
466                         else {
467                             size_t osz = strlen(result) + 1;
468                             result =
469                                 jk_pool_realloc(s->pool, osz + sz + 1, result, osz);
470                             strcat(result, ";");
471                             strncat(result, id_start, sz);
472                         }
473                         id_start = id_end;
474                     }
475                 }
476             }
477         }
478     }
479
480     return result;
481 }
482
483
484 /* Retrieve session id from the cookie or the parameter
485  * (parameter first)
486  */
487 static char *get_sessionid(jk_ws_service_t *s, lb_worker_t *p, jk_logger_t *l)
488 {
489     char *val;
490     val = get_path_param(s, p->session_path);
491     if (!val) {
492         val = get_cookie(s, p->session_cookie);
493     }
494     if (val && !*val) {
495         /* TODO: For now only log the empty sessions.
496          *       However we should probably return 400
497          *       (BAD_REQUEST) in this case
498          */
499         jk_log(l, JK_LOG_INFO,
500                "Detected empty session identifier.");
501         return NULL;
502     }
503     return val;
504 }
505
506 static void close_workers(lb_worker_t *p, int num_of_workers, jk_logger_t *l)
507 {
508     int i = 0;
509     for (i = 0; i < num_of_workers; i++) {
510         p->lb_workers[i].worker->destroy(&(p->lb_workers[i].worker), l);
511     }
512 }
513
514 /* If the worker is in error state run
515  * retry on that worker. It will be marked as
516  * operational if the retry timeout is elapsed.
517  * The worker might still be unusable, but we try
518  * anyway.
519  * If the worker is in ok state and got no requests
520  * since the last global maintenance, we mark its
521  * state as not available.
522  * Return the number of workers not in error state.
523  */
524 static int recover_workers(lb_worker_t *p,
525                            jk_uint64_t curmax,
526                            time_t now,
527                            jk_logger_t *l)
528 {
529     unsigned int i;
530     int non_error = 0;
531     int elapsed;
532     lb_sub_worker_t *w = NULL;
533     ajp_worker_t *aw = NULL;
534     JK_TRACE_ENTER(l);
535
536     if (p->sequence != p->s->h.sequence)
537         jk_lb_pull(p, JK_TRUE, l);
538     for (i = 0; i < p->num_of_workers; i++) {
539         w = &p->lb_workers[i];
540         aw = (ajp_worker_t *)w->worker->worker_private;
541         if (w->s->state == JK_LB_STATE_ERROR) {
542             elapsed = (int)difftime(now, w->s->error_time);
543             if (elapsed <= p->recover_wait_time) {
544                 if (JK_IS_DEBUG_LEVEL(l))
545                     jk_log(l, JK_LOG_DEBUG,
546                            "worker %s will recover in %d seconds",
547                            w->name, p->recover_wait_time - elapsed);
548             }
549             else {
550                 if (JK_IS_DEBUG_LEVEL(l))
551                     jk_log(l, JK_LOG_DEBUG,
552                            "worker %s is marked for recovery",
553                            w->name);
554                 if (p->lbmethod != JK_LB_METHOD_BUSYNESS)
555                     w->s->lb_value = curmax;
556                 aw->s->reply_timeouts = 0;
557                 w->s->state = JK_LB_STATE_RECOVER;
558                 non_error++;
559             }
560         }
561         else if (w->s->error_time > 0 &&
562                  (int)difftime(now, w->s->error_time) >= p->error_escalation_time) {
563             if (JK_IS_DEBUG_LEVEL(l))
564                 jk_log(l, JK_LOG_DEBUG,
565                        "worker %s escalating local error to global error",
566                        w->name);
567             w->s->state = JK_LB_STATE_ERROR;
568         }
569         else {
570             non_error++;
571             if (w->s->state == JK_LB_STATE_OK &&
572                 aw->s->used == w->s->elected_snapshot)
573                 w->s->state = JK_LB_STATE_IDLE;
574         }
575         w->s->elected_snapshot = aw->s->used;
576     }
577
578     JK_TRACE_EXIT(l);
579     return non_error;
580 }
581
582 static int force_recovery(lb_worker_t *p,
583                           int *states,
584                           jk_logger_t *l)
585 {
586     unsigned int i;
587     int forced = 0;
588     lb_sub_worker_t *w = NULL;
589     ajp_worker_t *aw = NULL;
590     JK_TRACE_ENTER(l);
591
592     for (i = 0; i < p->num_of_workers; i++) {
593         w = &p->lb_workers[i];
594         if (w->s->state == JK_LB_STATE_ERROR) {
595             if (JK_IS_DEBUG_LEVEL(l))
596                 jk_log(l, JK_LOG_INFO,
597                        "worker %s is marked for forced recovery",
598                        w->name);
599             aw = (ajp_worker_t *)w->worker->worker_private;
600             aw->s->reply_timeouts = 0;
601             w->s->state = JK_LB_STATE_FORCE;
602             if (states != NULL)
603                 states[i] = JK_LB_STATE_FORCE;
604             forced++;
605         }
606     }
607
608     JK_TRACE_EXIT(l);
609     return forced;
610 }
611
612 /* Divide old load values by the decay factor,
613  * such that older values get less important
614  * for the routing decisions.
615  */
616 static jk_uint64_t decay_load(lb_worker_t *p,
617                               time_t exponent,
618                               jk_logger_t *l)
619 {
620     unsigned int i;
621     jk_uint64_t curmax = 0;
622     lb_sub_worker_t *w;
623     ajp_worker_t *aw;
624
625     JK_TRACE_ENTER(l);
626     for (i = 0; i < p->num_of_workers; i++) {
627         w = &p->lb_workers[i];
628         if (p->lbmethod != JK_LB_METHOD_BUSYNESS) {
629             w->s->lb_value >>= exponent;
630             if (w->s->lb_value > curmax) {
631                 curmax = w->s->lb_value;
632             }
633         }
634         aw = (ajp_worker_t *)w->worker->worker_private;
635         aw->s->reply_timeouts >>= exponent;
636     }
637     JK_TRACE_EXIT(l);
638     return curmax;
639 }
640
641 static int JK_METHOD maintain_workers(jk_worker_t *p, time_t now, jk_logger_t *l)
642 {
643     unsigned int i = 0;
644     jk_uint64_t curmax = 0;
645     long delta;
646
647     JK_TRACE_ENTER(l);
648     if (p && p->worker_private) {
649         lb_worker_t *lb = (lb_worker_t *)p->worker_private;
650
651         for (i = 0; i < lb->num_of_workers; i++) {
652             if (lb->lb_workers[i].worker->maintain) {
653                 lb->lb_workers[i].worker->maintain(lb->lb_workers[i].worker, now, l);
654             }
655         }
656
657         jk_shm_lock();
658
659         /* Now we check for global maintenance (once for all processes).
660          * Checking workers for recovery and applying decay to the
661          * load values should not be done by each process individually.
662          * Therefore we globally sync and we use a global timestamp.
663          * Since it's possible that we come here a few milliseconds
664          * before the interval has passed, we allow a little tolerance.
665          */
666         delta = (long)difftime(now, lb->s->last_maintain_time) + JK_LB_MAINTAIN_TOLERANCE;
667         if (delta >= lb->maintain_time) {
668             lb->s->last_maintain_time = now;
669             if (JK_IS_DEBUG_LEVEL(l))
670                 jk_log(l, JK_LOG_DEBUG,
671                        "decay with 2^%d",
672                        JK_LB_DECAY_MULT * delta / lb->maintain_time);
673             curmax = decay_load(lb, JK_LB_DECAY_MULT * delta / lb->maintain_time, l);
674             if (!recover_workers(lb, curmax, now, l)) {
675                 force_recovery(lb, NULL, l);
676             }
677         }
678
679         jk_shm_unlock();
680
681     }
682     else {
683         JK_LOG_NULL_PARAMS(l);
684     }
685
686     JK_TRACE_EXIT(l);
687     return JK_TRUE;
688 }
689
690 static int find_by_session(jk_ws_service_t *s,
691                            lb_worker_t *p,
692                            const char *session_route,
693                            jk_logger_t *l)
694 {
695
696     int rc = -1;
697     unsigned int i;
698
699     for (i = 0; i < p->num_of_workers; i++) {
700         if (strcmp(p->lb_workers[i].route, session_route) == 0) {
701             rc = i;
702             break;
703         }
704     }
705     return rc;
706 }
707
708 static int find_best_bydomain(jk_ws_service_t *s,
709                               lb_worker_t *p,
710                               const char *route_or_domain,
711                               int *states,
712                               jk_logger_t *l)
713 {
714     unsigned int i;
715     int d = 0;
716     jk_uint64_t curmin = 0;
717     int candidate = -1;
718     int activation;
719     lb_sub_worker_t wr;
720     char *idpart = strchr(route_or_domain, '.');
721     size_t domain_len = 0;
722
723     if (idpart) {
724         domain_len = idpart - route_or_domain;
725     }
726     else {
727         domain_len = strlen(route_or_domain);
728     }
729     /* First try to see if we have available candidate */
730     for (i = 0; i < p->num_of_workers; i++) {
731         /* Skip all workers that are not member of domain */
732         wr = p->lb_workers[i];
733         if (strlen(wr.domain) == 0 ||
734             strlen(wr.domain) != domain_len ||
735             strncmp(wr.domain, route_or_domain, domain_len))
736             continue;
737         /* Take into calculation only the workers that are
738          * not in error state, stopped, disabled or busy.
739          */
740         activation = s->extension.activation ?
741                      s->extension.activation[i] :
742                      JK_LB_ACTIVATION_UNSET;
743         if (activation == JK_LB_ACTIVATION_UNSET)
744             activation = wr.activation;
745         if (JK_WORKER_USABLE(states[wr.i], activation)) {
746             if (candidate < 0 || wr.distance < d ||
747                 (wr.s->lb_value < curmin &&
748                 wr.distance == d)) {
749                 candidate = i;
750                 curmin = wr.s->lb_value;
751                 d = wr.distance;
752             }
753         }
754     }
755     return candidate;
756 }
757
758
759 static int find_best_byvalue(jk_ws_service_t *s,
760                              lb_worker_t *p,
761                              int *states,
762                              jk_logger_t *l)
763 {
764     unsigned int i;
765     unsigned int j;
766     unsigned int offset;
767     int d = 0;
768     jk_uint64_t curmin = 0;
769
770     /* find the least busy worker */
771     int candidate = -1;
772     int activation;
773     lb_sub_worker_t wr;
774
775     offset = p->next_offset;
776
777     /* First try to see if we have available candidate */
778     for (j = offset; j < p->num_of_workers + offset; j++) {
779         i = j % p->num_of_workers;
780         wr = p->lb_workers[i];
781         activation = s->extension.activation ?
782                      s->extension.activation[i] :
783                      JK_LB_ACTIVATION_UNSET;
784         if (activation == JK_LB_ACTIVATION_UNSET)
785             activation = wr.activation;
786
787         /* Take into calculation only the workers that are
788          * not in error state, stopped, disabled or busy.
789          */
790         if (JK_WORKER_USABLE(states[wr.i], activation)) {
791             if (candidate < 0 || wr.distance < d ||
792                 (wr.s->lb_value < curmin &&
793                 wr.distance == d)) {
794                 candidate = i;
795                 curmin = wr.s->lb_value;
796                 d = wr.distance;
797                 p->next_offset = i + 1;
798             }
799         }
800     }
801     return candidate;
802 }
803
804 static int find_bysession_route(jk_ws_service_t *s,
805                                 lb_worker_t *p,
806                                 const char *session_route,
807                                 int *states,
808                                 jk_logger_t *l)
809 {
810     int uses_domain  = 0;
811     int candidate = -1;
812
813     candidate = find_by_session(s, p, session_route, l);
814     if (candidate < 0) {
815         uses_domain = 1;
816         candidate = find_best_bydomain(s, p, session_route, states, l);
817     }
818     if (candidate >= 0) {
819         lb_sub_worker_t wr = p->lb_workers[candidate];
820         int activation;
821         if (uses_domain)
822             s->route = wr.domain;
823         activation = s->extension.activation ?
824                      s->extension.activation[candidate] :
825                      JK_LB_ACTIVATION_UNSET;
826         if (activation == JK_LB_ACTIVATION_UNSET)
827             activation = wr.activation;
828         if (!JK_WORKER_USABLE_STICKY(states[wr.i], activation)) {
829             /* We have a worker that is error state or stopped.
830              * If it has a redirection set use that redirection worker.
831              * This enables to safely remove the member from the
832              * balancer. Of course you will need a some kind of
833              * session replication between those two remote.
834              */
835             if (p->sticky_session_force)
836                 candidate = -1;
837             else if (*wr.redirect) {
838                 candidate = find_by_session(s, p, wr.redirect, l);
839                 s->route = NULL;
840             }
841             else if (*wr.domain && !uses_domain) {
842                 candidate = find_best_bydomain(s, p, wr.domain, states, l);
843                 if (candidate >= 0) {
844                     s->route = wr.domain;
845                 } else {
846                     s->route = NULL;
847                 }
848             }
849             if (candidate >= 0) {
850                 wr = p->lb_workers[candidate];
851                 activation = s->extension.activation ?
852                              s->extension.activation[candidate] :
853                              JK_LB_ACTIVATION_UNSET;
854                 if (activation == JK_LB_ACTIVATION_UNSET)
855                     activation = wr.activation;
856                 if (!JK_WORKER_USABLE_STICKY(states[wr.i], activation))
857                     candidate = -1;
858             }
859         }
860     }
861     return candidate;
862 }
863
864 static int find_failover_worker(jk_ws_service_t *s,
865                                 lb_worker_t *p,
866                                 int *states,
867                                 jk_logger_t *l)
868 {
869     int rc = -1;
870     unsigned int i;
871     char *redirect = NULL;
872
873     for (i = 0; i < p->num_of_workers; i++) {
874         if (strlen(p->lb_workers[i].redirect)) {
875             redirect = p->lb_workers[i].redirect;
876             break;
877         }
878     }
879     if (redirect)
880         rc = find_bysession_route(s, p, redirect, states, l);
881     return rc;
882 }
883
884 static int find_best_worker(jk_ws_service_t *s,
885                             lb_worker_t *p,
886                             int *states,
887                             jk_logger_t *l)
888 {
889     int rc = -1;
890
891     rc = find_best_byvalue(s, p, states, l);
892     /* By default use worker route as session route */
893     if (rc < 0)
894         rc = find_failover_worker(s, p, states, l);
895     return rc;
896 }
897
898 static int get_most_suitable_worker(jk_ws_service_t *s,
899                                     lb_worker_t *p,
900                                     char *sessionid,
901                                     int *states,
902                                     jk_logger_t *l)
903 {
904     int rc = -1;
905     int r;
906
907     JK_TRACE_ENTER(l);
908     if (p->num_of_workers == 1) {
909         /* No need to find the best worker
910          * if there is a single one
911          */
912         int activation = s->extension.activation ?
913                          s->extension.activation[0] :
914                          JK_LB_ACTIVATION_UNSET;
915         if (activation == JK_LB_ACTIVATION_UNSET)
916             activation = p->lb_workers[0].activation;
917         if (JK_WORKER_USABLE_STICKY(states[0], activation)) {
918             if (activation != JK_LB_ACTIVATION_DISABLED) {
919                 JK_TRACE_EXIT(l);
920                 return 0;
921             }
922         }
923         else {
924             JK_TRACE_EXIT(l);
925             return -1;
926         }
927     }
928     if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
929         r = jk_shm_lock();
930     else {
931         JK_ENTER_CS(&(p->cs), r);
932     }
933     if (!r) {
934        jk_log(l, JK_LOG_ERROR,
935               "locking failed (errno=%d)",
936               errno);
937     }
938     if (sessionid) {
939         char *session = sessionid;
940         while (sessionid) {
941             char *next = strchr(sessionid, ';');
942             char *session_route = NULL;
943             if (next)
944                *next++ = '\0';
945             if (JK_IS_DEBUG_LEVEL(l))
946                 jk_log(l, JK_LOG_DEBUG,
947                        "searching worker for partial sessionid %s",
948                        sessionid);
949             session_route = strchr(sessionid, '.');
950             if (session_route) {
951                 ++session_route;
952
953                 if (JK_IS_DEBUG_LEVEL(l))
954                     jk_log(l, JK_LOG_DEBUG,
955                            "searching worker for session route %s",
956                            session_route);
957
958                 /* We have a session route. Whow! */
959                 rc = find_bysession_route(s, p, session_route, states, l);
960                 if (rc >= 0) {
961                     lb_sub_worker_t *wr = &(p->lb_workers[rc]);
962                     if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
963                         jk_shm_unlock();
964                     else {
965                         JK_LEAVE_CS(&(p->cs), r);
966                     }
967                     if (JK_IS_DEBUG_LEVEL(l))
968                         jk_log(l, JK_LOG_DEBUG,
969                                "found worker %s (%s) for route %s and partial sessionid %s",
970                                wr->name, wr->route, session_route, sessionid);
971                     JK_TRACE_EXIT(l);
972                     return rc;
973                 }
974             }
975             /* Try next partial sessionid if present */
976             sessionid = next;
977             rc = -1;
978         }
979         if (rc < 0 && p->sticky_session_force) {
980             if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
981                 jk_shm_unlock();
982             else {
983                 JK_LEAVE_CS(&(p->cs), r);
984             }
985             jk_log(l, JK_LOG_INFO,
986                    "all workers are in error state for session %s",
987                    session);
988             JK_TRACE_EXIT(l);
989             return -1;
990         }
991     }
992     rc = find_best_worker(s, p, states, l);
993     if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
994         jk_shm_unlock();
995     else {
996         JK_LEAVE_CS(&(p->cs), r);
997     }
998     if (rc >= 0) {
999         lb_sub_worker_t *wr = &(p->lb_workers[rc]);
1000         if (JK_IS_DEBUG_LEVEL(l))
1001             jk_log(l, JK_LOG_DEBUG,
1002                    "found best worker %s (%s) using method '%s'",
1003                    wr->name, wr->route, jk_lb_get_method(p, l));
1004         JK_TRACE_EXIT(l);
1005         return rc;
1006     }
1007     JK_TRACE_EXIT(l);
1008     return -1;
1009 }
1010
1011 static void lb_add_log_items(jk_ws_service_t *s,
1012                              const char *const *log_names,
1013                              lb_sub_worker_t *w,
1014                              jk_logger_t *l)
1015 {
1016     ajp_worker_t *aw = (ajp_worker_t *)w->worker->worker_private;
1017     const char **log_values = jk_pool_alloc(s->pool, sizeof(char *) * JK_LB_NOTES_COUNT);
1018     char *buf = jk_pool_alloc(s->pool, sizeof(char *) * JK_LB_NOTES_COUNT * JK_LB_UINT64_STR_SZ);
1019     if (log_values && buf) {
1020         /* JK_NOTE_LB_FIRST/LAST_NAME */
1021         log_values[0] = w->name;
1022         snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, w->s->lb_value);
1023         /* JK_NOTE_LB_FIRST/LAST_VALUE */
1024         log_values[1] = buf;
1025         buf += JK_LB_UINT64_STR_SZ;
1026         snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, aw->s->used);
1027         /* JK_NOTE_LB_FIRST/LAST_ACCESSED */
1028         log_values[2] = buf;
1029         buf += JK_LB_UINT64_STR_SZ;
1030         snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, aw->s->readed);
1031         /* JK_NOTE_LB_FIRST/LAST_READ */
1032         log_values[3] = buf;
1033         buf += JK_LB_UINT64_STR_SZ;
1034         snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, aw->s->transferred);
1035         /* JK_NOTE_LB_FIRST/LAST_TRANSFERRED */
1036         log_values[4] = buf;
1037         buf += JK_LB_UINT64_STR_SZ;
1038         snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT32_T_FMT, w->s->errors);
1039         /* JK_NOTE_LB_FIRST/LAST_ERRORS */
1040         log_values[5] = buf;
1041         buf += JK_LB_UINT64_STR_SZ;
1042         snprintf(buf, JK_LB_UINT64_STR_SZ, "%d", aw->s->busy);
1043         /* JK_NOTE_LB_FIRST/LAST_BUSY */
1044         log_values[6] = buf;
1045         /* JK_NOTE_LB_FIRST/LAST_ACTIVATION */
1046         log_values[7] = jk_lb_get_activation(w, l);
1047         /* JK_NOTE_LB_FIRST/LAST_STATE */
1048         log_values[8] = jk_lb_get_state(w, l);
1049         s->add_log_items(s, log_names, log_values, JK_LB_NOTES_COUNT);
1050     }
1051 }
1052
1053 static int JK_METHOD service(jk_endpoint_t *e,
1054                              jk_ws_service_t *s,
1055                              jk_logger_t *l, int *is_error)
1056 {
1057     lb_endpoint_t *p;
1058     int attempt = 0;
1059     lb_sub_worker_t *prec = NULL;
1060     int num_of_workers;
1061     int first = 1;
1062     int was_forced = 0;
1063     int recoverable = JK_TRUE;
1064     int rc = JK_UNSET;
1065     char *sessionid = NULL;
1066     int i;
1067     int retry = 0;
1068
1069     JK_TRACE_ENTER(l);
1070
1071     if (!e || !e->endpoint_private || !s || !is_error) {
1072         JK_LOG_NULL_PARAMS(l);
1073         if (is_error)
1074             *is_error = JK_HTTP_SERVER_ERROR;
1075         JK_TRACE_EXIT(l);
1076         return JK_FALSE;
1077     }
1078
1079     p = e->endpoint_private;
1080     num_of_workers = p->worker->num_of_workers;
1081
1082     /* Set returned error to OK */
1083     *is_error = JK_HTTP_OK;
1084
1085     if (p->worker->sequence != p->worker->s->h.sequence)
1086         jk_lb_pull(p->worker, JK_FALSE, l);
1087     for (i = 0; i < num_of_workers; i++) {
1088         lb_sub_worker_t *rec = &(p->worker->lb_workers[i]);
1089         if (rec->s->state == JK_LB_STATE_BUSY) {
1090             if (ajp_has_endpoint(rec->worker, l)) {
1091                 if (JK_IS_DEBUG_LEVEL(l))
1092                     jk_log(l, JK_LOG_DEBUG,
1093                            "worker %s busy count fixed",
1094                            rec->name);
1095                 rec->s->state = JK_LB_STATE_OK;
1096             }
1097         }
1098         /* Copy the shared state info */
1099         p->states[i] = rec->s->state;
1100     }
1101
1102     /* set the recovery post, for LB mode */
1103     s->reco_buf = jk_b_new(s->pool);
1104     if (!s->reco_buf) {
1105         *is_error = JK_HTTP_SERVER_ERROR;
1106         jk_log(l, JK_LOG_ERROR,
1107                "Failed allocating AJP message");
1108         JK_TRACE_EXIT(l);
1109         return JK_SERVER_ERROR;
1110     }
1111     if (jk_b_set_buffer_size(s->reco_buf, p->worker->max_packet_size)) {
1112         *is_error = JK_HTTP_SERVER_ERROR;
1113         jk_log(l, JK_LOG_ERROR,
1114                "Failed allocating AJP message buffer");
1115         JK_TRACE_EXIT(l);
1116         return JK_SERVER_ERROR;
1117     }
1118     jk_b_reset(s->reco_buf);
1119     s->reco_status = RECO_INITED;
1120
1121     if (p->worker->sticky_session) {
1122         /* Use sessionid only if sticky_session is
1123          * defined for this load balancer
1124          */
1125         sessionid = get_sessionid(s, p->worker, l);
1126     }
1127     if (JK_IS_DEBUG_LEVEL(l))
1128         jk_log(l, JK_LOG_DEBUG,
1129                "service sticky_session=%d id='%s'",
1130                p->worker->sticky_session, sessionid ? sessionid : "empty");
1131
1132     while (recoverable == JK_TRUE) {
1133         if (attempt >= num_of_workers) {
1134             retry++;
1135             if (retry >= p->worker->retries) {
1136                 /* Done with retrying */
1137                 break;
1138             }
1139             if (JK_IS_DEBUG_LEVEL(l))
1140                 jk_log(l, JK_LOG_DEBUG,
1141                        "retry %d, sleeping for %d ms before retrying",
1142                        retry, p->worker->retry_interval);
1143             jk_sleep(p->worker->retry_interval);
1144             /* Pull shared memory if something changed during sleep */
1145             if (p->worker->sequence != p->worker->s->h.sequence)
1146                 jk_lb_pull(p->worker, JK_FALSE, l);
1147             for (i = 0; i < num_of_workers; i++) {
1148                 /* Copy the shared state info */
1149                 p->states[i] = p->worker->lb_workers[i].s->state;
1150             }
1151             attempt = 0;
1152         }
1153         rc = JK_FALSE;
1154         *is_error = JK_HTTP_SERVER_BUSY;
1155         i = get_most_suitable_worker(s, p->worker, sessionid, p->states, l);
1156         if (i >= 0) {
1157             int r;
1158             int is_service_error = JK_HTTP_OK;
1159             lb_sub_worker_t *rec = &(p->worker->lb_workers[i]);
1160             ajp_worker_t *aw = (ajp_worker_t *)rec->worker->worker_private;
1161             jk_endpoint_t *end = NULL;
1162             int activation = s->extension.activation ?
1163                              s->extension.activation[i] :
1164                              JK_LB_ACTIVATION_UNSET;
1165             if (activation == JK_LB_ACTIVATION_UNSET)
1166                 activation = rec->activation;
1167             if (!s->route)
1168                 s->route = rec->route;
1169             s->activation = jk_lb_get_activation_direct(activation, l);
1170             prec = rec;
1171
1172             if (JK_IS_DEBUG_LEVEL(l))
1173                 jk_log(l, JK_LOG_DEBUG,
1174                        "service worker=%s route=%s",
1175                        rec->name, s->route);
1176
1177             if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1178                 jk_shm_lock();
1179             if (rec->s->state == JK_LB_STATE_RECOVER) {
1180                 rec->s->state  = JK_LB_STATE_PROBE;
1181                 p->states[rec->i] = JK_LB_STATE_PROBE;
1182             }
1183             if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1184                 jk_shm_unlock();
1185
1186             r = rec->worker->get_endpoint(rec->worker, &end, l);
1187             if (!r || !end) {
1188                 /* If we can not get the endpoint
1189                  * mark the worker as busy rather then
1190                  * as in error if the retry number is
1191                  * greater then the number of retries.
1192                  */
1193                 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1194                     jk_shm_lock();
1195                 if (rec->s->state != JK_LB_STATE_ERROR) {
1196                     rec->s->state  = JK_LB_STATE_BUSY;
1197                     p->states[rec->i] = JK_LB_STATE_BUSY;
1198                 }
1199                 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1200                     jk_shm_unlock();
1201                 jk_log(l, JK_LOG_INFO,
1202                        "could not get free endpoint for worker %s (%d retries)",
1203                        rec->name, retry);
1204             }
1205             else {
1206                 int service_stat = JK_UNSET;
1207                 jk_uint64_t rd = 0;
1208                 jk_uint64_t wr = 0;
1209                 /* Reset endpoint read and write sizes for
1210                  * this request.
1211                  */
1212                 end->rd = end->wr = 0;
1213                 end->recoverable = JK_TRUE;
1214                 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1215                     jk_shm_lock();
1216
1217                 /* Increment the number of workers serving request */
1218                 p->worker->s->busy++;
1219                 rec->s->busy++;
1220                 if (p->worker->s->busy > p->worker->s->max_busy)
1221                     p->worker->s->max_busy = p->worker->s->busy;
1222                 if ( (p->worker->lbmethod == JK_LB_METHOD_REQUESTS) ||
1223                      (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) ||
1224                      (p->worker->lbmethod == JK_LB_METHOD_SESSIONS &&
1225                       !sessionid) )
1226                     rec->s->lb_value += rec->lb_mult;
1227                 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1228                     jk_shm_unlock();
1229
1230                 service_stat = end->service(end, s, l, &is_service_error);
1231                 rd = end->rd;
1232                 wr = end->wr;
1233                 recoverable = end->recoverable;
1234                 *is_error = is_service_error;
1235                 end->done(&end, l);
1236
1237                 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1238                     jk_shm_lock();
1239
1240                 /* Update partial reads and writes if any */
1241                 if (p->worker->lbmethod == JK_LB_METHOD_TRAFFIC) {
1242                     rec->s->lb_value += (rd+wr)*rec->lb_mult;
1243                 }
1244                 else if (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) {
1245                     if (rec->s->lb_value >= rec->lb_mult) {
1246                         rec->s->lb_value -= rec->lb_mult;
1247                     }
1248                     else {
1249                         rec->s->lb_value = 0;
1250                         if (JK_IS_DEBUG_LEVEL(l)) {
1251                             jk_log(l, JK_LOG_DEBUG,
1252                                    "worker %s has load value to low (%"
1253                                    JK_UINT64_T_FMT
1254                                    " < %"
1255                                    JK_UINT64_T_FMT
1256                                    ") ",
1257                                    "- correcting to 0",
1258                                    rec->name,
1259                                    rec->s->lb_value,
1260                                    rec->lb_mult);
1261                         }
1262                     }
1263                 }
1264
1265                 /* When have an endpoint and we ran a request, assume
1266                  * we are OK, unless we last were in error.
1267                  * We will below explicitely set OK or ERROR according
1268                  * to the returned service_stat.
1269                  */
1270                 if (rec->s->state != JK_LB_STATE_ERROR) {
1271                     rec->s->state  = JK_LB_STATE_OK;
1272                     p->states[rec->i] = JK_LB_STATE_OK;
1273                 }
1274                 /* Decrement the busy worker count.
1275                  * Check if the busy was reset to zero by graceful
1276                  * restart of the server.
1277                  */
1278                 if (p->worker->s->busy)
1279                     p->worker->s->busy--;
1280                 if (rec->s->busy)
1281                     rec->s->busy--;
1282                 if (service_stat == JK_TRUE) {
1283                     /*
1284                      * Successful request.
1285                      */
1286                     rec->s->state  = JK_LB_STATE_OK;
1287                     p->states[rec->i] = JK_LB_STATE_OK;
1288                     rec->s->error_time = 0;
1289                     rc = JK_TRUE;
1290                     recoverable = JK_UNSET;
1291                 }
1292                 else if (service_stat == JK_CLIENT_ERROR) {
1293                     /*
1294                      * Client error !!!
1295                      * Since this is bad request do not fail over.
1296                      */
1297                     rec->s->state  = JK_LB_STATE_OK;
1298                     p->states[rec->i] = JK_LB_STATE_ERROR;
1299                     rec->s->error_time = 0;
1300                     rc = JK_CLIENT_ERROR;
1301                     recoverable = JK_FALSE;
1302                 }
1303                 else if (service_stat == JK_SERVER_ERROR) {
1304                     /*
1305                      * Internal JK server error.
1306                      * Keep previous global state.
1307                      * Do not try to reuse the same node for the same request.
1308                      * Failing over to another node could help.
1309                      */
1310                     p->states[rec->i] = JK_LB_STATE_ERROR;
1311                     rc = JK_FALSE;
1312                 }
1313                 else if (service_stat == JK_AJP_PROTOCOL_ERROR) {
1314                     /*
1315                      * We've received a bad AJP message from the backend.
1316                      * Keep previous global state.
1317                      * Do not try to reuse the same node for the same request.
1318                      * Failing over to another node could help.
1319                      */
1320                     p->states[rec->i] = JK_LB_STATE_ERROR;
1321                     rc = JK_FALSE;
1322                 }
1323                 else if (service_stat == JK_STATUS_ERROR) {
1324                     /*
1325                      * Status code configured as service is down.
1326                      * The node is fine.
1327                      * Do not try to reuse the same node for the same request.
1328                      * Failing over to another node could help.
1329                      */
1330                     rec->s->state  = JK_LB_STATE_OK;
1331                     p->states[rec->i] = JK_LB_STATE_ERROR;
1332                     rec->s->error_time = 0;
1333                     rc = JK_FALSE;
1334                 }
1335                 else if (service_stat == JK_STATUS_FATAL_ERROR) {
1336                     /*
1337                      * Status code configured as service is down.
1338                      * Mark the node as bad.
1339                      * Do not try to reuse the same node for the same request.
1340                      * Failing over to another node could help.
1341                      */
1342                     rec->s->errors++;
1343                     rec->s->state = JK_LB_STATE_ERROR;
1344                     p->states[rec->i] = JK_LB_STATE_ERROR;
1345                     rec->s->error_time = time(NULL);
1346                     rc = JK_FALSE;
1347                 }
1348                 else if (service_stat == JK_REPLY_TIMEOUT) {
1349                     if (aw->s->reply_timeouts > (unsigned)p->worker->max_reply_timeouts) {
1350                         /*
1351                          * Service failed - to many reply timeouts
1352                          * Mark the node as bad.
1353                          * Do not try to reuse the same node for the same request.
1354                          * Failing over to another node could help.
1355                          */
1356                         rec->s->errors++;
1357                         rec->s->state = JK_LB_STATE_ERROR;
1358                         p->states[rec->i] = JK_LB_STATE_ERROR;
1359                         rec->s->error_time = time(NULL);
1360                     }
1361                     else {
1362                         /*
1363                          * Reply timeout, bot not yet too many of them.
1364                          * Keep previous global state.
1365                          * Do not try to reuse the same node for the same request.
1366                          * Failing over to another node could help.
1367                          */
1368                         p->states[rec->i] = JK_LB_STATE_ERROR;
1369                     }
1370                     rc = JK_FALSE;
1371                 }
1372                 else {
1373                     /*
1374                      * Various unspecific error cases.
1375                      * Keep previous global state, if we are not in local error since to long.
1376                      * Do not try to reuse the same node for the same request.
1377                      * Failing over to another node could help.
1378                      */
1379                     time_t now = time(NULL);
1380                     rec->s->errors++;
1381                     if (rec->s->busy == 0 ||
1382                         p->worker->error_escalation_time == 0 ||
1383                         (rec->s->error_time > 0 &&
1384                          (int)difftime(now, rec->s->error_time) >= p->worker->error_escalation_time)) {
1385                         if (JK_IS_DEBUG_LEVEL(l))
1386                             jk_log(l, JK_LOG_DEBUG,
1387                                    "worker %s escalating local error to global error",
1388                                    rec->name);
1389                         rec->s->state = JK_LB_STATE_ERROR;
1390                     }
1391                     p->states[rec->i] = JK_LB_STATE_ERROR;
1392                     if (rec->s->error_time == 0) {
1393                         rec->s->error_time = now;
1394                     }
1395                     rc = JK_FALSE;
1396                 }
1397                 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1398                     jk_shm_unlock();
1399                 if (p->states[rec->i] == JK_LB_STATE_ERROR)
1400                     jk_log(l, JK_LOG_INFO,
1401                            "service failed, worker %s is in %serror state",
1402                            rec->name,
1403                            rec->s->state == JK_LB_STATE_ERROR ? "" : "local ");
1404             }
1405             if (recoverable == JK_TRUE) {
1406                 /*
1407                  * Error is recoverable by submitting the request to
1408                  * another worker... Lets try to do that.
1409                  */
1410                 if (JK_IS_DEBUG_LEVEL(l))
1411                     jk_log(l, JK_LOG_DEBUG,
1412                            "recoverable error... will try to recover on other worker");
1413             }
1414             else {
1415                 /*
1416                  * Error is not recoverable - break with an error.
1417                  */
1418                 if (rc == JK_CLIENT_ERROR)
1419                     jk_log(l, JK_LOG_INFO,
1420                            "unrecoverable error %d, request failed."
1421                            " Client failed in the middle of request,"
1422                            " we can't recover to another instance.",
1423                            *is_error);
1424                 else if (rc != JK_TRUE)
1425                     jk_log(l, JK_LOG_ERROR,
1426                            "unrecoverable error %d, request failed."
1427                            " Tomcat failed in the middle of request,"
1428                            " we can't recover to another instance.",
1429                            *is_error);
1430             }
1431             if (first == 1 && s->add_log_items) {
1432                 first = 0;
1433                 lb_add_log_items(s, lb_first_log_names, prec, l);
1434             }
1435         }
1436         else {
1437             /* No more workers left ... */
1438             if (!was_forced) {
1439                 int nf;
1440                 /* Force recovery only once.
1441                  * If it still fails, Tomcat is still disconnected.
1442                  */
1443                 jk_shm_lock();
1444                 nf = force_recovery(p->worker, p->states, l);
1445                 jk_shm_unlock();
1446                 was_forced = 1;
1447                 if (nf) {
1448                     /* We have forced recovery.
1449                      * Reset the service loop and go again
1450                      */
1451                     prec = NULL;
1452                     jk_log(l, JK_LOG_INFO,
1453                            "Forcing recovery once for %d workers", nf);
1454                     continue;
1455                 }
1456                 else {
1457                     /* No workers in error state.
1458                      * Somebody set them all to disabled?
1459                      */
1460                     jk_log(l, JK_LOG_INFO,
1461                            "All tomcat instances failed, no more workers "
1462                            "left for recovery (attempt=%d, retry=%d)",
1463                            attempt, retry);
1464                     *is_error = JK_HTTP_SERVER_BUSY;
1465                     rc = JK_FALSE;
1466                 }
1467             }
1468             else {
1469                 jk_log(l, JK_LOG_INFO,
1470                        "All tomcat instances failed, no more workers "
1471                        "left (attempt=%d, retry=%d)",
1472                        attempt, retry);
1473                 *is_error = JK_HTTP_SERVER_BUSY;
1474                 rc = JK_FALSE;
1475             }
1476         }
1477         attempt++;
1478     }
1479     if (recoverable == JK_TRUE) {
1480         jk_log(l, JK_LOG_INFO,
1481                "All tomcat instances are busy or in error state");
1482         /* rc and http error must be set above */
1483     }
1484     if (rc == JK_FALSE) {
1485         jk_log(l, JK_LOG_ERROR,
1486                "All tomcat instances failed, no more workers left");
1487     }
1488     if (prec && s->add_log_items) {
1489         lb_add_log_items(s, lb_last_log_names, prec, l);
1490     }
1491
1492     JK_TRACE_EXIT(l);
1493     return rc;
1494 }
1495
1496 static int JK_METHOD done(jk_endpoint_t **e, jk_logger_t *l)
1497 {
1498     JK_TRACE_ENTER(l);
1499
1500     if (e && *e && (*e)->endpoint_private) {
1501         lb_endpoint_t *p = (*e)->endpoint_private;
1502         free(p->states);
1503         free(p);
1504         *e = NULL;
1505         JK_TRACE_EXIT(l);
1506         return JK_TRUE;
1507     }
1508
1509     JK_LOG_NULL_PARAMS(l);
1510     JK_TRACE_EXIT(l);
1511     return JK_FALSE;
1512 }
1513
1514 static int JK_METHOD validate(jk_worker_t *pThis,
1515                               jk_map_t *props,
1516                               jk_worker_env_t *we, jk_logger_t *l)
1517 {
1518     JK_TRACE_ENTER(l);
1519
1520     if (pThis && pThis->worker_private) {
1521         lb_worker_t *p = pThis->worker_private;
1522         char **worker_names;
1523         unsigned int num_of_workers;
1524         const char *secret;
1525
1526         p->sticky_session = jk_get_is_sticky_session(props, p->name);
1527         p->sticky_session_force = jk_get_is_sticky_session_force(props, p->name);
1528         secret = jk_get_worker_secret(props, p->name);
1529
1530         if (jk_get_lb_worker_list(props,
1531                                   p->name,
1532                                   &worker_names,
1533                                   &num_of_workers) && num_of_workers) {
1534             unsigned int i = 0;
1535             unsigned int j = 0;
1536             p->max_packet_size = DEF_BUFFER_SZ;
1537             p->lb_workers = jk_pool_alloc(&p->p,
1538                                           num_of_workers *
1539                                           sizeof(lb_sub_worker_t));
1540             if (!p->lb_workers) {
1541                 JK_TRACE_EXIT(l);
1542                 return JK_FALSE;
1543             }
1544             memset(p->lb_workers, 0, num_of_workers * sizeof(lb_sub_worker_t));
1545             for (i = 0; i < num_of_workers; i++) {
1546                 p->lb_workers[i].s = jk_shm_alloc_lb_sub_worker(&p->p);
1547                 if (p->lb_workers[i].s == NULL) {
1548                     jk_log(l, JK_LOG_ERROR,
1549                            "allocating lb sub worker record from shared memory");
1550                     JK_TRACE_EXIT(l);
1551                     return JK_FALSE;
1552                 }
1553             }
1554
1555             for (i = 0; i < num_of_workers; i++) {
1556                 const char *s;
1557                 unsigned int ms;
1558
1559                 p->lb_workers[i].i = i;
1560                 strncpy(p->lb_workers[i].name, worker_names[i],
1561                         JK_SHM_STR_SIZ);
1562                 strncpy(p->lb_workers[i].s->h.name, worker_names[i],
1563                         JK_SHM_STR_SIZ);
1564                 p->lb_workers[i].sequence = 0;
1565                 p->lb_workers[i].s->h.sequence = 0;
1566                 p->lb_workers[i].lb_factor =
1567                     jk_get_lb_factor(props, worker_names[i]);
1568                 if (p->lb_workers[i].lb_factor < 1) {
1569                     p->lb_workers[i].lb_factor = 1;
1570                 }
1571                 /* Calculate the maximum packet size from all workers
1572                  * for the recovery buffer.
1573                  */
1574                 ms = jk_get_max_packet_size(props, worker_names[i]);
1575                 if (ms > p->max_packet_size)
1576                     p->max_packet_size = ms;
1577                 p->lb_workers[i].distance =
1578                     jk_get_distance(props, worker_names[i]);
1579                 if ((s = jk_get_worker_route(props, worker_names[i], NULL)))
1580                     strncpy(p->lb_workers[i].route, s, JK_SHM_STR_SIZ);
1581                 else
1582                     strncpy(p->lb_workers[i].route, worker_names[i], JK_SHM_STR_SIZ);
1583                 if ((s = jk_get_worker_domain(props, worker_names[i], NULL)))
1584                     strncpy(p->lb_workers[i].domain, s, JK_SHM_STR_SIZ);
1585                 if ((s = jk_get_worker_redirect(props, worker_names[i], NULL)))
1586                     strncpy(p->lb_workers[i].redirect, s, JK_SHM_STR_SIZ);
1587
1588                 p->lb_workers[i].s->lb_value = 0;
1589                 p->lb_workers[i].s->state = JK_LB_STATE_IDLE;
1590                 p->lb_workers[i].s->error_time = 0;
1591                 p->lb_workers[i].activation =
1592                     jk_get_worker_activation(props, worker_names[i]);
1593                 if (!wc_create_worker(p->lb_workers[i].name, 0,
1594                                       props,
1595                                       &(p->lb_workers[i].worker),
1596                                       we, l) || !p->lb_workers[i].worker) {
1597                     break;
1598                 }
1599                 if (secret && (p->lb_workers[i].worker->type == JK_AJP13_WORKER_TYPE ||
1600                     p->lb_workers[i].worker->type == JK_AJP14_WORKER_TYPE)) {
1601                     ajp_worker_t *aw = (ajp_worker_t *)p->lb_workers[i].worker->worker_private;
1602                     if (!aw->secret)
1603                         aw->secret = secret;
1604                 }
1605                 if (p->lb_workers[i].worker->type == JK_AJP13_WORKER_TYPE ||
1606                     p->lb_workers[i].worker->type == JK_AJP14_WORKER_TYPE) {
1607                     ajp_worker_t *aw = (ajp_worker_t *)p->lb_workers[i].worker->worker_private;
1608                     if (aw->port == 0) {
1609                         p->lb_workers[i].activation = JK_LB_ACTIVATION_STOPPED;
1610                     }
1611                 }
1612             }
1613
1614             if (i != num_of_workers) {
1615                 jk_log(l, JK_LOG_ERROR,
1616                        "Failed creating worker %s",
1617                        p->lb_workers[i].name);
1618                 close_workers(p, i, l);
1619             }
1620             else {
1621                 /* Update domain names if route contains period '.' */
1622                 for (i = 0; i < num_of_workers; i++) {
1623                     if (!p->lb_workers[i].domain[0]) {
1624                         char *id_domain = strchr(p->lb_workers[i].route, '.');
1625                         if (id_domain) {
1626                             *id_domain = '\0';
1627                             strcpy(p->lb_workers[i].domain, p->lb_workers[i].route);
1628                             *id_domain = '.';
1629                         }
1630                     }
1631                     if (JK_IS_DEBUG_LEVEL(l)) {
1632                         jk_log(l, JK_LOG_DEBUG,
1633                                "Balanced worker %i has name %s and route %s in domain %s",
1634                                i,
1635                                p->lb_workers[i].name,
1636                                p->lb_workers[i].route,
1637                                p->lb_workers[i].domain);
1638                     }
1639                 }
1640                 p->num_of_workers = num_of_workers;
1641                 update_mult(p, l);
1642                 for (i = 0; i < num_of_workers; i++) {
1643                     for (j = 0; j < i; j++) {
1644                         if (strcmp(p->lb_workers[i].route, p->lb_workers[j].route) == 0) {
1645                             jk_log(l, JK_LOG_ERROR,
1646                                    "Balanced workers number %i (%s) and %i (%s) share the same route %s - aborting configuration!",
1647                                    i,
1648                                    p->lb_workers[i].name,
1649                                    j,
1650                                    p->lb_workers[j].name,
1651                                    p->lb_workers[i].route);
1652                             JK_TRACE_EXIT(l);
1653                             return JK_FALSE;
1654                         }
1655                     }
1656                 }
1657                 JK_TRACE_EXIT(l);
1658                 return JK_TRUE;
1659             }
1660         }
1661     }
1662
1663     JK_LOG_NULL_PARAMS(l);
1664     JK_TRACE_EXIT(l);
1665     return JK_FALSE;
1666 }
1667
1668 static int JK_METHOD init(jk_worker_t *pThis,
1669                           jk_map_t *props,
1670                           jk_worker_env_t *we, jk_logger_t *log)
1671 {
1672     int i;
1673
1674     lb_worker_t *p = (lb_worker_t *)pThis->worker_private;
1675     JK_TRACE_ENTER(log);
1676
1677     p->worker.we = we;
1678     p->retries = jk_get_worker_retries(props, p->name,
1679                                        JK_RETRIES);
1680     p->retry_interval =
1681             jk_get_worker_retry_interval(props, p->name,
1682                                         JK_SLEEP_DEF);
1683     p->recover_wait_time = jk_get_worker_recover_timeout(props, p->name,
1684                                                          WAIT_BEFORE_RECOVER);
1685     if (p->recover_wait_time < 1)
1686         p->recover_wait_time = 1;
1687     p->error_escalation_time = jk_get_worker_error_escalation_time(props, p->name,
1688                                                                    p->recover_wait_time / 2);
1689     p->max_reply_timeouts = jk_get_worker_max_reply_timeouts(props, p->name,
1690                                                              0);
1691     p->maintain_time = jk_get_worker_maintain_time(props);
1692     if(p->maintain_time < 0)
1693         p->maintain_time = 0;
1694     p->s->last_maintain_time = time(NULL);
1695     p->s->last_reset = p->s->last_maintain_time;
1696
1697     p->lbmethod = jk_get_lb_method(props, p->name);
1698     p->lblock   = jk_get_lb_lock(props, p->name);
1699     strncpy(p->session_cookie,
1700             jk_get_lb_session_cookie(props, p->name, JK_SESSION_IDENTIFIER),
1701             JK_SHM_STR_SIZ);
1702     strncpy(p->session_path,
1703             jk_get_lb_session_path(props, p->name, JK_PATH_SESSION_IDENTIFIER),
1704             JK_SHM_STR_SIZ);
1705     strcpy(p->s->session_cookie, p->session_cookie);
1706     strcpy(p->s->session_path, p->session_path);
1707
1708     JK_INIT_CS(&(p->cs), i);
1709     if (i == JK_FALSE) {
1710         jk_log(log, JK_LOG_ERROR,
1711                "creating thread lock (errno=%d)",
1712                errno);
1713         JK_TRACE_EXIT(log);
1714         return JK_FALSE;
1715     }
1716
1717     p->sequence++;
1718     jk_lb_push(p, JK_FALSE, log);
1719
1720     JK_TRACE_EXIT(log);
1721     return JK_TRUE;
1722 }
1723
1724 static int JK_METHOD get_endpoint(jk_worker_t *pThis,
1725                                   jk_endpoint_t **pend, jk_logger_t *l)
1726 {
1727     JK_TRACE_ENTER(l);
1728
1729     if (pThis && pThis->worker_private && pend) {
1730         lb_endpoint_t *p = (lb_endpoint_t *) malloc(sizeof(lb_endpoint_t));
1731         p->worker = pThis->worker_private;
1732         p->endpoint.endpoint_private = p;
1733         p->endpoint.service = service;
1734         p->endpoint.done = done;
1735         p->states = (int *)malloc((p->worker->num_of_workers + 1) * sizeof(int));
1736         if (!p->states) {
1737             free(p);
1738             jk_log(l, JK_LOG_ERROR,
1739                    "Failed allocating private worker state memory");
1740             JK_TRACE_EXIT(l);
1741             return JK_FALSE;
1742         }
1743         *pend = &p->endpoint;
1744         JK_TRACE_EXIT(l);
1745         return JK_TRUE;
1746     }
1747     else {
1748         JK_LOG_NULL_PARAMS(l);
1749     }
1750
1751     JK_TRACE_EXIT(l);
1752     return JK_FALSE;
1753 }
1754
1755 static int JK_METHOD destroy(jk_worker_t **pThis, jk_logger_t *l)
1756 {
1757     JK_TRACE_ENTER(l);
1758
1759     if (pThis && *pThis && (*pThis)->worker_private) {
1760         unsigned int i;
1761         lb_worker_t *private_data = (*pThis)->worker_private;
1762
1763         close_workers(private_data, private_data->num_of_workers, l);
1764         JK_DELETE_CS(&(private_data->cs), i);
1765         jk_close_pool(&private_data->p);
1766         free(private_data);
1767
1768         JK_TRACE_EXIT(l);
1769         return JK_TRUE;
1770     }
1771
1772     JK_LOG_NULL_PARAMS(l);
1773     JK_TRACE_EXIT(l);
1774     return JK_FALSE;
1775 }
1776
1777 int JK_METHOD lb_worker_factory(jk_worker_t **w,
1778                                 const char *name, jk_logger_t *l)
1779 {
1780     JK_TRACE_ENTER(l);
1781
1782     if (NULL != name && NULL != w) {
1783         lb_worker_t *private_data =
1784             (lb_worker_t *) calloc(1, sizeof(lb_worker_t));
1785
1786
1787         jk_open_pool(&private_data->p,
1788                         private_data->buf,
1789                         sizeof(jk_pool_atom_t) * TINY_POOL_SIZE);
1790
1791         private_data->s = jk_shm_alloc_lb_worker(&private_data->p);
1792         if (!private_data->s) {
1793             free(private_data);
1794             JK_TRACE_EXIT(l);
1795             return 0;
1796         }
1797         strncpy(private_data->name, name, JK_SHM_STR_SIZ);
1798         strncpy(private_data->s->h.name, name, JK_SHM_STR_SIZ);
1799         private_data->lb_workers = NULL;
1800         private_data->num_of_workers = 0;
1801         private_data->worker.worker_private = private_data;
1802         private_data->worker.validate = validate;
1803         private_data->worker.init = init;
1804         private_data->worker.get_endpoint = get_endpoint;
1805         private_data->worker.destroy = destroy;
1806         private_data->worker.maintain = maintain_workers;
1807         private_data->recover_wait_time = WAIT_BEFORE_RECOVER;
1808         private_data->error_escalation_time = private_data->recover_wait_time / 2;
1809         private_data->max_reply_timeouts = 0;
1810         private_data->sequence = 0;
1811         private_data->s->h.sequence = 0;
1812         private_data->next_offset = 0;
1813         *w = &private_data->worker;
1814         JK_TRACE_EXIT(l);
1815         return JK_LB_WORKER_TYPE;
1816     }
1817     else {
1818         JK_LOG_NULL_PARAMS(l);
1819     }
1820
1821     JK_TRACE_EXIT(l);
1822     return 0;
1823 }