2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 /***************************************************************************
19 * Description: Load balancer worker, knows how to load balance among *
21 * Author: Gal Shachor <shachor@il.ibm.com> *
22 * Author: Mladen Turk <mturk@apache.org> *
23 * Author: Rainer Jung <rjung@apache.org> *
25 * Version: $Revision: 1137200 $ *
26 ***************************************************************************/
29 #include "jk_service.h"
31 #include "jk_worker.h"
32 #include "jk_lb_worker.h"
34 #include "jk_ajp13_worker.h"
35 #include "jk_ajp14_worker.h"
40 * The load balancing code in this
44 * The following two macros need to be kept in sync with
45 * the existing values for state and activation.
46 * Note: state <= JK_LB_STATE_FORCE is equivalent to
47 * state is none of JK_LB_STATE_BUSY, JK_LB_STATE_ERROR, JK_LB_STATE_PROBE
48 * Note: state <= JK_LB_STATE_BUSY is equivalent to
49 * state is none of JK_LB_STATE_ERROR, JK_LB_STATE_PROBE
50 * Note: activation == JK_LB_ACTIVATION_ACTIVE is equivalent to
51 * activation is none of JK_LB_ACTIVATION_STOPPED, JK_LB_ACTIVATION_DISABLED
53 #define JK_WORKER_USABLE(s, activation) ((s) <= JK_LB_STATE_FORCE && activation == JK_LB_ACTIVATION_ACTIVE)
54 #define JK_WORKER_USABLE_STICKY(s, activation) ((s) <= JK_LB_STATE_BUSY && activation != JK_LB_ACTIVATION_STOPPED)
56 static const char *lb_locking_type[] = {
57 JK_LB_LOCK_TEXT_OPTIMISTIC,
58 JK_LB_LOCK_TEXT_PESSIMISTIC,
63 static const char *lb_method_type[] = {
64 JK_LB_METHOD_TEXT_REQUESTS,
65 JK_LB_METHOD_TEXT_TRAFFIC,
66 JK_LB_METHOD_TEXT_BUSYNESS,
67 JK_LB_METHOD_TEXT_SESSIONS,
72 static const char *lb_state_type[] = {
73 JK_LB_STATE_TEXT_IDLE,
75 JK_LB_STATE_TEXT_RECOVER,
76 JK_LB_STATE_TEXT_FORCE,
77 JK_LB_STATE_TEXT_BUSY,
78 JK_LB_STATE_TEXT_ERROR,
79 JK_LB_STATE_TEXT_PROBE,
84 static const char *lb_activation_type[] = {
85 JK_LB_ACTIVATION_TEXT_ACTIVE,
86 JK_LB_ACTIVATION_TEXT_DISABLED,
87 JK_LB_ACTIVATION_TEXT_STOPPED,
92 static const char *lb_first_log_names[] = {
93 JK_NOTE_LB_FIRST_NAME,
94 JK_NOTE_LB_FIRST_VALUE,
95 JK_NOTE_LB_FIRST_ACCESSED,
96 JK_NOTE_LB_FIRST_READ,
97 JK_NOTE_LB_FIRST_TRANSFERRED,
98 JK_NOTE_LB_FIRST_ERRORS,
99 JK_NOTE_LB_FIRST_BUSY,
100 JK_NOTE_LB_FIRST_ACTIVATION,
101 JK_NOTE_LB_FIRST_STATE,
105 static const char *lb_last_log_names[] = {
106 JK_NOTE_LB_LAST_NAME,
107 JK_NOTE_LB_LAST_VALUE,
108 JK_NOTE_LB_LAST_ACCESSED,
109 JK_NOTE_LB_LAST_READ,
110 JK_NOTE_LB_LAST_TRANSFERRED,
111 JK_NOTE_LB_LAST_ERRORS,
112 JK_NOTE_LB_LAST_BUSY,
113 JK_NOTE_LB_LAST_ACTIVATION,
114 JK_NOTE_LB_LAST_STATE,
121 jk_endpoint_t endpoint;
124 typedef struct lb_endpoint lb_endpoint_t;
127 /* Calculate the greatest common divisor of two positive integers */
128 static jk_uint64_t gcd(jk_uint64_t a, jk_uint64_t b)
144 /* Calculate the smallest common multiple of two positive integers */
145 static jk_uint64_t scm(jk_uint64_t a, jk_uint64_t b)
147 return a * b / gcd(a, b);
150 /* Return the string representation of the lb lock type */
151 const char *jk_lb_get_lock(lb_worker_t *p, jk_logger_t *l)
153 return lb_locking_type[p->lblock];
156 /* Return the int representation of the lb lock type */
157 int jk_lb_get_lock_code(const char *v)
160 return JK_LB_LOCK_DEF;
161 else if (*v == 'o' || *v == 'O' || *v == '0')
162 return JK_LB_LOCK_OPTIMISTIC;
163 else if (*v == 'p' || *v == 'P' || *v == '1')
164 return JK_LB_LOCK_PESSIMISTIC;
166 return JK_LB_LOCK_DEF;
169 /* Return the string representation of the lb method type */
170 const char *jk_lb_get_method(lb_worker_t *p, jk_logger_t *l)
172 return lb_method_type[p->lbmethod];
175 /* Return the int representation of the lb method type */
176 int jk_lb_get_method_code(const char *v)
179 return JK_LB_METHOD_DEF;
180 else if (*v == 'r' || *v == 'R' || *v == '0')
181 return JK_LB_METHOD_REQUESTS;
182 else if (*v == 't' || *v == 'T' || *v == '1')
183 return JK_LB_METHOD_TRAFFIC;
184 else if (*v == 'b' || *v == 'B' || *v == '2')
185 return JK_LB_METHOD_BUSYNESS;
186 else if (*v == 's' || *v == 'S' || *v == '3')
187 return JK_LB_METHOD_SESSIONS;
189 return JK_LB_METHOD_DEF;
192 /* Return the string representation of the balance worker state */
193 const char *jk_lb_get_state(lb_sub_worker_t *p, jk_logger_t *l)
195 return lb_state_type[p->s->state];
198 /* Return the int representation of the lb state */
199 int jk_lb_get_state_code(const char *v)
202 return JK_LB_STATE_DEF;
203 else if (*v == 'i' || *v == 'I' || *v == 'n' || *v == 'N' || *v == '0')
204 return JK_LB_STATE_IDLE;
205 else if (*v == 'o' || *v == 'O' || *v == '1')
206 return JK_LB_STATE_OK;
207 else if (*v == 'r' || *v == 'R' || *v == '2')
208 return JK_LB_STATE_RECOVER;
209 else if (*v == 'f' || *v == 'F' || *v == '3')
210 return JK_LB_STATE_FORCE;
211 else if (*v == 'b' || *v == 'B' || *v == '4')
212 return JK_LB_STATE_BUSY;
213 else if (*v == 'e' || *v == 'E' || *v == '5')
214 return JK_LB_STATE_ERROR;
215 else if (*v == 'p' || *v == 'P' || *v == '6')
216 return JK_LB_STATE_PROBE;
218 return JK_LB_STATE_DEF;
221 /* Return the string representation of the balance worker activation */
222 /* based on the integer representation */
223 const char *jk_lb_get_activation_direct(int activation, jk_logger_t *l)
225 return lb_activation_type[activation];
228 /* Return the string representation of the balance worker activation */
229 /* based on the sub worker struct */
230 const char *jk_lb_get_activation(lb_sub_worker_t *p, jk_logger_t *l)
232 return lb_activation_type[p->activation];
235 int jk_lb_get_activation_code(const char *v)
238 return JK_LB_ACTIVATION_DEF;
239 else if (*v == 'a' || *v == 'A' || *v == '0')
240 return JK_LB_ACTIVATION_ACTIVE;
241 else if (*v == 'd' || *v == 'D' || *v == '1')
242 return JK_LB_ACTIVATION_DISABLED;
243 else if (*v == 's' || *v == 'S' || *v == '2')
244 return JK_LB_ACTIVATION_STOPPED;
246 return JK_LB_ACTIVATION_DEF;
249 /* Update the load multipliers wrt. lb_factor */
250 void update_mult(lb_worker_t *p, jk_logger_t *l)
255 for (i = 0; i < p->num_of_workers; i++) {
256 s = scm(s, p->lb_workers[i].lb_factor);
258 for (i = 0; i < p->num_of_workers; i++) {
259 p->lb_workers[i].lb_mult = s / p->lb_workers[i].lb_factor;
260 if (JK_IS_DEBUG_LEVEL(l))
261 jk_log(l, JK_LOG_DEBUG,
262 "worker %s gets multiplicity %"
264 p->lb_workers[i].name,
265 p->lb_workers[i].lb_mult);
270 /* Reset all lb values.
272 void reset_lb_values(lb_worker_t *p, jk_logger_t *l)
276 if (p->lbmethod != JK_LB_METHOD_BUSYNESS) {
277 for (i = 0; i < p->num_of_workers; i++) {
278 p->lb_workers[i].s->lb_value = 0;
284 /* Syncing config values from shm */
285 void jk_lb_pull(lb_worker_t *p, int locked, jk_logger_t *l)
291 if (JK_IS_DEBUG_LEVEL(l))
292 jk_log(l, JK_LOG_DEBUG,
293 "syncing mem for lb '%s' from shm (%u->%u)",
294 p->name, p->sequence, p->s->h.sequence);
295 if (locked == JK_FALSE)
297 p->sticky_session = p->s->sticky_session;
298 p->sticky_session_force = p->s->sticky_session_force;
299 p->recover_wait_time = p->s->recover_wait_time;
300 p->error_escalation_time = p->s->error_escalation_time;
301 p->max_reply_timeouts = p->s->max_reply_timeouts;
302 p->retries = p->s->retries;
303 p->retry_interval = p->s->retry_interval;
304 p->lbmethod = p->s->lbmethod;
305 p->lblock = p->s->lblock;
306 p->max_packet_size = p->s->max_packet_size;
307 p->sequence = p->s->h.sequence;
308 strncpy(p->session_cookie, p->s->session_cookie, JK_SHM_STR_SIZ);
309 strncpy(p->session_path, p->s->session_path, JK_SHM_STR_SIZ);
311 for (i = 0; i < p->num_of_workers; i++) {
312 lb_sub_worker_t *w = &p->lb_workers[i];
313 if (w->sequence != w->s->h.sequence) {
314 jk_worker_t *jw = w->worker;
315 ajp_worker_t *aw = (ajp_worker_t *)jw->worker_private;
317 if (JK_IS_DEBUG_LEVEL(l))
318 jk_log(l, JK_LOG_DEBUG,
319 "syncing mem for member '%s' of lb '%s' from shm",
322 jk_ajp_pull(aw, JK_TRUE, l);
323 strncpy(w->route, w->s->route, JK_SHM_STR_SIZ);
324 strncpy(w->domain, w->s->domain, JK_SHM_STR_SIZ);
325 strncpy(w->redirect, w->s->redirect, JK_SHM_STR_SIZ);
326 w->distance = w->s->distance;
327 w->activation = w->s->activation;
328 w->lb_factor = w->s->lb_factor;
329 w->lb_mult = w->s->lb_mult;
330 w->sequence = w->s->h.sequence;
333 if (locked == JK_FALSE)
339 /* Syncing config values to shm */
340 void jk_lb_push(lb_worker_t *p, int locked, jk_logger_t *l)
346 if (JK_IS_DEBUG_LEVEL(l))
347 jk_log(l, JK_LOG_DEBUG,
348 "syncing shm for lb '%s' from mem (%u->%u)",
349 p->name, p->s->h.sequence, p->sequence);
350 if (locked == JK_FALSE)
352 p->s->sticky_session = p->sticky_session;
353 p->s->sticky_session_force = p->sticky_session_force;
354 p->s->recover_wait_time = p->recover_wait_time;
355 p->s->error_escalation_time = p->error_escalation_time;
356 p->s->max_reply_timeouts = p->max_reply_timeouts;
357 p->s->retries = p->retries;
358 p->s->retry_interval = p->retry_interval;
359 p->s->lbmethod = p->lbmethod;
360 p->s->lblock = p->lblock;
361 p->s->max_packet_size = p->max_packet_size;
362 p->s->h.sequence = p->sequence;
363 strncpy(p->s->session_cookie, p->session_cookie, JK_SHM_STR_SIZ);
364 strncpy(p->s->session_path, p->session_path, JK_SHM_STR_SIZ);
366 for (i = 0; i < p->num_of_workers; i++) {
367 lb_sub_worker_t *w = &p->lb_workers[i];
368 if (w->sequence != w->s->h.sequence) {
369 jk_worker_t *jw = w->worker;
370 ajp_worker_t *aw = (ajp_worker_t *)jw->worker_private;
372 if (JK_IS_DEBUG_LEVEL(l))
373 jk_log(l, JK_LOG_DEBUG,
374 "syncing shm for member '%s' of lb '%s' from mem",
377 jk_ajp_push(aw, JK_TRUE, l);
378 strncpy(w->s->route, w->route, JK_SHM_STR_SIZ);
379 strncpy(w->s->domain, w->domain, JK_SHM_STR_SIZ);
380 strncpy(w->s->redirect, w->redirect, JK_SHM_STR_SIZ);
381 w->s->distance = w->distance;
382 w->s->activation = w->activation;
383 w->s->lb_factor = w->lb_factor;
384 w->s->lb_mult = w->lb_mult;
385 w->s->h.sequence = w->sequence;
388 if (locked == JK_FALSE)
394 /* Retrieve the parameter with the given name */
395 static char *get_path_param(jk_ws_service_t *s, const char *name)
397 char *id_start = NULL;
398 for (id_start = strstr(s->req_uri, name);
399 id_start; id_start = strstr(id_start + 1, name)) {
400 if (id_start[strlen(name)] == '=') {
402 * Session path-cookie was found, get it's value
404 id_start += (1 + strlen(name));
405 if (strlen(id_start)) {
407 id_start = jk_pool_strdup(s->pool, id_start);
409 * The query string is not part of req_uri, however
410 * to be on the safe side lets remove the trailing query
411 * string if appended...
413 if ((id_end = strchr(id_start, '?')) != NULL) {
417 * Remove any trailing path element.
419 if ((id_end = strchr(id_start, ';')) != NULL) {
430 /* Retrieve the cookie with the given name */
431 static char *get_cookie(jk_ws_service_t *s, const char *name)
436 for (i = 0; i < s->num_headers; i++) {
437 if (strcasecmp(s->headers_names[i], "cookie") == 0) {
440 for (id_start = strstr(s->headers_values[i], name);
441 id_start; id_start = strstr(id_start + 1, name)) {
442 if (id_start == s->headers_values[i] ||
443 id_start[-1] == ';' ||
444 id_start[-1] == ',' || isspace((int)id_start[-1])) {
445 id_start += strlen(name);
446 while (*id_start && isspace((int)(*id_start)))
448 if (*id_start == '=' && id_start[1]) {
450 * Session cookie was found, get it's value
455 if ((id_end = strpbrk(id_start, ";,")) != NULL)
456 sz = id_end - id_start;
458 sz = strlen(id_start);
459 id_end = id_start + sz;
461 if (result == NULL) {
462 result = jk_pool_alloc(s->pool, sz + 1);
463 memcpy(result, id_start, sz);
467 size_t osz = strlen(result) + 1;
469 jk_pool_realloc(s->pool, osz + sz + 1, result, osz);
471 strncat(result, id_start, sz);
484 /* Retrieve session id from the cookie or the parameter
487 static char *get_sessionid(jk_ws_service_t *s, lb_worker_t *p, jk_logger_t *l)
490 val = get_path_param(s, p->session_path);
492 val = get_cookie(s, p->session_cookie);
495 /* TODO: For now only log the empty sessions.
496 * However we should probably return 400
497 * (BAD_REQUEST) in this case
499 jk_log(l, JK_LOG_INFO,
500 "Detected empty session identifier.");
506 static void close_workers(lb_worker_t *p, int num_of_workers, jk_logger_t *l)
509 for (i = 0; i < num_of_workers; i++) {
510 p->lb_workers[i].worker->destroy(&(p->lb_workers[i].worker), l);
514 /* If the worker is in error state run
515 * retry on that worker. It will be marked as
516 * operational if the retry timeout is elapsed.
517 * The worker might still be unusable, but we try
519 * If the worker is in ok state and got no requests
520 * since the last global maintenance, we mark its
521 * state as not available.
522 * Return the number of workers not in error state.
524 static int recover_workers(lb_worker_t *p,
532 lb_sub_worker_t *w = NULL;
533 ajp_worker_t *aw = NULL;
536 if (p->sequence != p->s->h.sequence)
537 jk_lb_pull(p, JK_TRUE, l);
538 for (i = 0; i < p->num_of_workers; i++) {
539 w = &p->lb_workers[i];
540 aw = (ajp_worker_t *)w->worker->worker_private;
541 if (w->s->state == JK_LB_STATE_ERROR) {
542 elapsed = (int)difftime(now, w->s->error_time);
543 if (elapsed <= p->recover_wait_time) {
544 if (JK_IS_DEBUG_LEVEL(l))
545 jk_log(l, JK_LOG_DEBUG,
546 "worker %s will recover in %d seconds",
547 w->name, p->recover_wait_time - elapsed);
550 if (JK_IS_DEBUG_LEVEL(l))
551 jk_log(l, JK_LOG_DEBUG,
552 "worker %s is marked for recovery",
554 if (p->lbmethod != JK_LB_METHOD_BUSYNESS)
555 w->s->lb_value = curmax;
556 aw->s->reply_timeouts = 0;
557 w->s->state = JK_LB_STATE_RECOVER;
561 else if (w->s->error_time > 0 &&
562 (int)difftime(now, w->s->error_time) >= p->error_escalation_time) {
563 if (JK_IS_DEBUG_LEVEL(l))
564 jk_log(l, JK_LOG_DEBUG,
565 "worker %s escalating local error to global error",
567 w->s->state = JK_LB_STATE_ERROR;
571 if (w->s->state == JK_LB_STATE_OK &&
572 aw->s->used == w->s->elected_snapshot)
573 w->s->state = JK_LB_STATE_IDLE;
575 w->s->elected_snapshot = aw->s->used;
582 static int force_recovery(lb_worker_t *p,
588 lb_sub_worker_t *w = NULL;
589 ajp_worker_t *aw = NULL;
592 for (i = 0; i < p->num_of_workers; i++) {
593 w = &p->lb_workers[i];
594 if (w->s->state == JK_LB_STATE_ERROR) {
595 if (JK_IS_DEBUG_LEVEL(l))
596 jk_log(l, JK_LOG_INFO,
597 "worker %s is marked for forced recovery",
599 aw = (ajp_worker_t *)w->worker->worker_private;
600 aw->s->reply_timeouts = 0;
601 w->s->state = JK_LB_STATE_FORCE;
603 states[i] = JK_LB_STATE_FORCE;
612 /* Divide old load values by the decay factor,
613 * such that older values get less important
614 * for the routing decisions.
616 static jk_uint64_t decay_load(lb_worker_t *p,
621 jk_uint64_t curmax = 0;
626 for (i = 0; i < p->num_of_workers; i++) {
627 w = &p->lb_workers[i];
628 if (p->lbmethod != JK_LB_METHOD_BUSYNESS) {
629 w->s->lb_value >>= exponent;
630 if (w->s->lb_value > curmax) {
631 curmax = w->s->lb_value;
634 aw = (ajp_worker_t *)w->worker->worker_private;
635 aw->s->reply_timeouts >>= exponent;
641 static int JK_METHOD maintain_workers(jk_worker_t *p, time_t now, jk_logger_t *l)
644 jk_uint64_t curmax = 0;
648 if (p && p->worker_private) {
649 lb_worker_t *lb = (lb_worker_t *)p->worker_private;
651 for (i = 0; i < lb->num_of_workers; i++) {
652 if (lb->lb_workers[i].worker->maintain) {
653 lb->lb_workers[i].worker->maintain(lb->lb_workers[i].worker, now, l);
659 /* Now we check for global maintenance (once for all processes).
660 * Checking workers for recovery and applying decay to the
661 * load values should not be done by each process individually.
662 * Therefore we globally sync and we use a global timestamp.
663 * Since it's possible that we come here a few milliseconds
664 * before the interval has passed, we allow a little tolerance.
666 delta = (long)difftime(now, lb->s->last_maintain_time) + JK_LB_MAINTAIN_TOLERANCE;
667 if (delta >= lb->maintain_time) {
668 lb->s->last_maintain_time = now;
669 if (JK_IS_DEBUG_LEVEL(l))
670 jk_log(l, JK_LOG_DEBUG,
672 JK_LB_DECAY_MULT * delta / lb->maintain_time);
673 curmax = decay_load(lb, JK_LB_DECAY_MULT * delta / lb->maintain_time, l);
674 if (!recover_workers(lb, curmax, now, l)) {
675 force_recovery(lb, NULL, l);
683 JK_LOG_NULL_PARAMS(l);
690 static int find_by_session(jk_ws_service_t *s,
692 const char *session_route,
699 for (i = 0; i < p->num_of_workers; i++) {
700 if (strcmp(p->lb_workers[i].route, session_route) == 0) {
708 static int find_best_bydomain(jk_ws_service_t *s,
710 const char *route_or_domain,
716 jk_uint64_t curmin = 0;
720 char *idpart = strchr(route_or_domain, '.');
721 size_t domain_len = 0;
724 domain_len = idpart - route_or_domain;
727 domain_len = strlen(route_or_domain);
729 /* First try to see if we have available candidate */
730 for (i = 0; i < p->num_of_workers; i++) {
731 /* Skip all workers that are not member of domain */
732 wr = p->lb_workers[i];
733 if (strlen(wr.domain) == 0 ||
734 strlen(wr.domain) != domain_len ||
735 strncmp(wr.domain, route_or_domain, domain_len))
737 /* Take into calculation only the workers that are
738 * not in error state, stopped, disabled or busy.
740 activation = s->extension.activation ?
741 s->extension.activation[i] :
742 JK_LB_ACTIVATION_UNSET;
743 if (activation == JK_LB_ACTIVATION_UNSET)
744 activation = wr.activation;
745 if (JK_WORKER_USABLE(states[wr.i], activation)) {
746 if (candidate < 0 || wr.distance < d ||
747 (wr.s->lb_value < curmin &&
750 curmin = wr.s->lb_value;
759 static int find_best_byvalue(jk_ws_service_t *s,
768 jk_uint64_t curmin = 0;
770 /* find the least busy worker */
775 offset = p->next_offset;
777 /* First try to see if we have available candidate */
778 for (j = offset; j < p->num_of_workers + offset; j++) {
779 i = j % p->num_of_workers;
780 wr = p->lb_workers[i];
781 activation = s->extension.activation ?
782 s->extension.activation[i] :
783 JK_LB_ACTIVATION_UNSET;
784 if (activation == JK_LB_ACTIVATION_UNSET)
785 activation = wr.activation;
787 /* Take into calculation only the workers that are
788 * not in error state, stopped, disabled or busy.
790 if (JK_WORKER_USABLE(states[wr.i], activation)) {
791 if (candidate < 0 || wr.distance < d ||
792 (wr.s->lb_value < curmin &&
795 curmin = wr.s->lb_value;
797 p->next_offset = i + 1;
804 static int find_bysession_route(jk_ws_service_t *s,
806 const char *session_route,
813 candidate = find_by_session(s, p, session_route, l);
816 candidate = find_best_bydomain(s, p, session_route, states, l);
818 if (candidate >= 0) {
819 lb_sub_worker_t wr = p->lb_workers[candidate];
822 s->route = wr.domain;
823 activation = s->extension.activation ?
824 s->extension.activation[candidate] :
825 JK_LB_ACTIVATION_UNSET;
826 if (activation == JK_LB_ACTIVATION_UNSET)
827 activation = wr.activation;
828 if (!JK_WORKER_USABLE_STICKY(states[wr.i], activation)) {
829 /* We have a worker that is error state or stopped.
830 * If it has a redirection set use that redirection worker.
831 * This enables to safely remove the member from the
832 * balancer. Of course you will need a some kind of
833 * session replication between those two remote.
835 if (p->sticky_session_force)
837 else if (*wr.redirect) {
838 candidate = find_by_session(s, p, wr.redirect, l);
841 else if (*wr.domain && !uses_domain) {
842 candidate = find_best_bydomain(s, p, wr.domain, states, l);
843 if (candidate >= 0) {
844 s->route = wr.domain;
849 if (candidate >= 0) {
850 wr = p->lb_workers[candidate];
851 activation = s->extension.activation ?
852 s->extension.activation[candidate] :
853 JK_LB_ACTIVATION_UNSET;
854 if (activation == JK_LB_ACTIVATION_UNSET)
855 activation = wr.activation;
856 if (!JK_WORKER_USABLE_STICKY(states[wr.i], activation))
864 static int find_failover_worker(jk_ws_service_t *s,
871 char *redirect = NULL;
873 for (i = 0; i < p->num_of_workers; i++) {
874 if (strlen(p->lb_workers[i].redirect)) {
875 redirect = p->lb_workers[i].redirect;
880 rc = find_bysession_route(s, p, redirect, states, l);
884 static int find_best_worker(jk_ws_service_t *s,
891 rc = find_best_byvalue(s, p, states, l);
892 /* By default use worker route as session route */
894 rc = find_failover_worker(s, p, states, l);
898 static int get_most_suitable_worker(jk_ws_service_t *s,
908 if (p->num_of_workers == 1) {
909 /* No need to find the best worker
910 * if there is a single one
912 int activation = s->extension.activation ?
913 s->extension.activation[0] :
914 JK_LB_ACTIVATION_UNSET;
915 if (activation == JK_LB_ACTIVATION_UNSET)
916 activation = p->lb_workers[0].activation;
917 if (JK_WORKER_USABLE_STICKY(states[0], activation)) {
918 if (activation != JK_LB_ACTIVATION_DISABLED) {
928 if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
931 JK_ENTER_CS(&(p->cs), r);
934 jk_log(l, JK_LOG_ERROR,
935 "locking failed (errno=%d)",
939 char *session = sessionid;
941 char *next = strchr(sessionid, ';');
942 char *session_route = NULL;
945 if (JK_IS_DEBUG_LEVEL(l))
946 jk_log(l, JK_LOG_DEBUG,
947 "searching worker for partial sessionid %s",
949 session_route = strchr(sessionid, '.');
953 if (JK_IS_DEBUG_LEVEL(l))
954 jk_log(l, JK_LOG_DEBUG,
955 "searching worker for session route %s",
958 /* We have a session route. Whow! */
959 rc = find_bysession_route(s, p, session_route, states, l);
961 lb_sub_worker_t *wr = &(p->lb_workers[rc]);
962 if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
965 JK_LEAVE_CS(&(p->cs), r);
967 if (JK_IS_DEBUG_LEVEL(l))
968 jk_log(l, JK_LOG_DEBUG,
969 "found worker %s (%s) for route %s and partial sessionid %s",
970 wr->name, wr->route, session_route, sessionid);
975 /* Try next partial sessionid if present */
979 if (rc < 0 && p->sticky_session_force) {
980 if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
983 JK_LEAVE_CS(&(p->cs), r);
985 jk_log(l, JK_LOG_INFO,
986 "all workers are in error state for session %s",
992 rc = find_best_worker(s, p, states, l);
993 if (p->lblock == JK_LB_LOCK_PESSIMISTIC)
996 JK_LEAVE_CS(&(p->cs), r);
999 lb_sub_worker_t *wr = &(p->lb_workers[rc]);
1000 if (JK_IS_DEBUG_LEVEL(l))
1001 jk_log(l, JK_LOG_DEBUG,
1002 "found best worker %s (%s) using method '%s'",
1003 wr->name, wr->route, jk_lb_get_method(p, l));
1011 static void lb_add_log_items(jk_ws_service_t *s,
1012 const char *const *log_names,
1016 ajp_worker_t *aw = (ajp_worker_t *)w->worker->worker_private;
1017 const char **log_values = jk_pool_alloc(s->pool, sizeof(char *) * JK_LB_NOTES_COUNT);
1018 char *buf = jk_pool_alloc(s->pool, sizeof(char *) * JK_LB_NOTES_COUNT * JK_LB_UINT64_STR_SZ);
1019 if (log_values && buf) {
1020 /* JK_NOTE_LB_FIRST/LAST_NAME */
1021 log_values[0] = w->name;
1022 snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, w->s->lb_value);
1023 /* JK_NOTE_LB_FIRST/LAST_VALUE */
1024 log_values[1] = buf;
1025 buf += JK_LB_UINT64_STR_SZ;
1026 snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, aw->s->used);
1027 /* JK_NOTE_LB_FIRST/LAST_ACCESSED */
1028 log_values[2] = buf;
1029 buf += JK_LB_UINT64_STR_SZ;
1030 snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, aw->s->readed);
1031 /* JK_NOTE_LB_FIRST/LAST_READ */
1032 log_values[3] = buf;
1033 buf += JK_LB_UINT64_STR_SZ;
1034 snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, aw->s->transferred);
1035 /* JK_NOTE_LB_FIRST/LAST_TRANSFERRED */
1036 log_values[4] = buf;
1037 buf += JK_LB_UINT64_STR_SZ;
1038 snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT32_T_FMT, w->s->errors);
1039 /* JK_NOTE_LB_FIRST/LAST_ERRORS */
1040 log_values[5] = buf;
1041 buf += JK_LB_UINT64_STR_SZ;
1042 snprintf(buf, JK_LB_UINT64_STR_SZ, "%d", aw->s->busy);
1043 /* JK_NOTE_LB_FIRST/LAST_BUSY */
1044 log_values[6] = buf;
1045 /* JK_NOTE_LB_FIRST/LAST_ACTIVATION */
1046 log_values[7] = jk_lb_get_activation(w, l);
1047 /* JK_NOTE_LB_FIRST/LAST_STATE */
1048 log_values[8] = jk_lb_get_state(w, l);
1049 s->add_log_items(s, log_names, log_values, JK_LB_NOTES_COUNT);
1053 static int JK_METHOD service(jk_endpoint_t *e,
1055 jk_logger_t *l, int *is_error)
1059 lb_sub_worker_t *prec = NULL;
1063 int recoverable = JK_TRUE;
1065 char *sessionid = NULL;
1071 if (!e || !e->endpoint_private || !s || !is_error) {
1072 JK_LOG_NULL_PARAMS(l);
1074 *is_error = JK_HTTP_SERVER_ERROR;
1079 p = e->endpoint_private;
1080 num_of_workers = p->worker->num_of_workers;
1082 /* Set returned error to OK */
1083 *is_error = JK_HTTP_OK;
1085 if (p->worker->sequence != p->worker->s->h.sequence)
1086 jk_lb_pull(p->worker, JK_FALSE, l);
1087 for (i = 0; i < num_of_workers; i++) {
1088 lb_sub_worker_t *rec = &(p->worker->lb_workers[i]);
1089 if (rec->s->state == JK_LB_STATE_BUSY) {
1090 if (ajp_has_endpoint(rec->worker, l)) {
1091 if (JK_IS_DEBUG_LEVEL(l))
1092 jk_log(l, JK_LOG_DEBUG,
1093 "worker %s busy count fixed",
1095 rec->s->state = JK_LB_STATE_OK;
1098 /* Copy the shared state info */
1099 p->states[i] = rec->s->state;
1102 /* set the recovery post, for LB mode */
1103 s->reco_buf = jk_b_new(s->pool);
1105 *is_error = JK_HTTP_SERVER_ERROR;
1106 jk_log(l, JK_LOG_ERROR,
1107 "Failed allocating AJP message");
1109 return JK_SERVER_ERROR;
1111 if (jk_b_set_buffer_size(s->reco_buf, p->worker->max_packet_size)) {
1112 *is_error = JK_HTTP_SERVER_ERROR;
1113 jk_log(l, JK_LOG_ERROR,
1114 "Failed allocating AJP message buffer");
1116 return JK_SERVER_ERROR;
1118 jk_b_reset(s->reco_buf);
1119 s->reco_status = RECO_INITED;
1121 if (p->worker->sticky_session) {
1122 /* Use sessionid only if sticky_session is
1123 * defined for this load balancer
1125 sessionid = get_sessionid(s, p->worker, l);
1127 if (JK_IS_DEBUG_LEVEL(l))
1128 jk_log(l, JK_LOG_DEBUG,
1129 "service sticky_session=%d id='%s'",
1130 p->worker->sticky_session, sessionid ? sessionid : "empty");
1132 while (recoverable == JK_TRUE) {
1133 if (attempt >= num_of_workers) {
1135 if (retry >= p->worker->retries) {
1136 /* Done with retrying */
1139 if (JK_IS_DEBUG_LEVEL(l))
1140 jk_log(l, JK_LOG_DEBUG,
1141 "retry %d, sleeping for %d ms before retrying",
1142 retry, p->worker->retry_interval);
1143 jk_sleep(p->worker->retry_interval);
1144 /* Pull shared memory if something changed during sleep */
1145 if (p->worker->sequence != p->worker->s->h.sequence)
1146 jk_lb_pull(p->worker, JK_FALSE, l);
1147 for (i = 0; i < num_of_workers; i++) {
1148 /* Copy the shared state info */
1149 p->states[i] = p->worker->lb_workers[i].s->state;
1154 *is_error = JK_HTTP_SERVER_BUSY;
1155 i = get_most_suitable_worker(s, p->worker, sessionid, p->states, l);
1158 int is_service_error = JK_HTTP_OK;
1159 lb_sub_worker_t *rec = &(p->worker->lb_workers[i]);
1160 ajp_worker_t *aw = (ajp_worker_t *)rec->worker->worker_private;
1161 jk_endpoint_t *end = NULL;
1162 int activation = s->extension.activation ?
1163 s->extension.activation[i] :
1164 JK_LB_ACTIVATION_UNSET;
1165 if (activation == JK_LB_ACTIVATION_UNSET)
1166 activation = rec->activation;
1168 s->route = rec->route;
1169 s->activation = jk_lb_get_activation_direct(activation, l);
1172 if (JK_IS_DEBUG_LEVEL(l))
1173 jk_log(l, JK_LOG_DEBUG,
1174 "service worker=%s route=%s",
1175 rec->name, s->route);
1177 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1179 if (rec->s->state == JK_LB_STATE_RECOVER) {
1180 rec->s->state = JK_LB_STATE_PROBE;
1181 p->states[rec->i] = JK_LB_STATE_PROBE;
1183 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1186 r = rec->worker->get_endpoint(rec->worker, &end, l);
1188 /* If we can not get the endpoint
1189 * mark the worker as busy rather then
1190 * as in error if the retry number is
1191 * greater then the number of retries.
1193 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1195 if (rec->s->state != JK_LB_STATE_ERROR) {
1196 rec->s->state = JK_LB_STATE_BUSY;
1197 p->states[rec->i] = JK_LB_STATE_BUSY;
1199 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1201 jk_log(l, JK_LOG_INFO,
1202 "could not get free endpoint for worker %s (%d retries)",
1206 int service_stat = JK_UNSET;
1209 /* Reset endpoint read and write sizes for
1212 end->rd = end->wr = 0;
1213 end->recoverable = JK_TRUE;
1214 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1217 /* Increment the number of workers serving request */
1218 p->worker->s->busy++;
1220 if (p->worker->s->busy > p->worker->s->max_busy)
1221 p->worker->s->max_busy = p->worker->s->busy;
1222 if ( (p->worker->lbmethod == JK_LB_METHOD_REQUESTS) ||
1223 (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) ||
1224 (p->worker->lbmethod == JK_LB_METHOD_SESSIONS &&
1226 rec->s->lb_value += rec->lb_mult;
1227 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1230 service_stat = end->service(end, s, l, &is_service_error);
1233 recoverable = end->recoverable;
1234 *is_error = is_service_error;
1237 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1240 /* Update partial reads and writes if any */
1241 if (p->worker->lbmethod == JK_LB_METHOD_TRAFFIC) {
1242 rec->s->lb_value += (rd+wr)*rec->lb_mult;
1244 else if (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) {
1245 if (rec->s->lb_value >= rec->lb_mult) {
1246 rec->s->lb_value -= rec->lb_mult;
1249 rec->s->lb_value = 0;
1250 if (JK_IS_DEBUG_LEVEL(l)) {
1251 jk_log(l, JK_LOG_DEBUG,
1252 "worker %s has load value to low (%"
1257 "- correcting to 0",
1265 /* When have an endpoint and we ran a request, assume
1266 * we are OK, unless we last were in error.
1267 * We will below explicitely set OK or ERROR according
1268 * to the returned service_stat.
1270 if (rec->s->state != JK_LB_STATE_ERROR) {
1271 rec->s->state = JK_LB_STATE_OK;
1272 p->states[rec->i] = JK_LB_STATE_OK;
1274 /* Decrement the busy worker count.
1275 * Check if the busy was reset to zero by graceful
1276 * restart of the server.
1278 if (p->worker->s->busy)
1279 p->worker->s->busy--;
1282 if (service_stat == JK_TRUE) {
1284 * Successful request.
1286 rec->s->state = JK_LB_STATE_OK;
1287 p->states[rec->i] = JK_LB_STATE_OK;
1288 rec->s->error_time = 0;
1290 recoverable = JK_UNSET;
1292 else if (service_stat == JK_CLIENT_ERROR) {
1295 * Since this is bad request do not fail over.
1297 rec->s->state = JK_LB_STATE_OK;
1298 p->states[rec->i] = JK_LB_STATE_ERROR;
1299 rec->s->error_time = 0;
1300 rc = JK_CLIENT_ERROR;
1301 recoverable = JK_FALSE;
1303 else if (service_stat == JK_SERVER_ERROR) {
1305 * Internal JK server error.
1306 * Keep previous global state.
1307 * Do not try to reuse the same node for the same request.
1308 * Failing over to another node could help.
1310 p->states[rec->i] = JK_LB_STATE_ERROR;
1313 else if (service_stat == JK_AJP_PROTOCOL_ERROR) {
1315 * We've received a bad AJP message from the backend.
1316 * Keep previous global state.
1317 * Do not try to reuse the same node for the same request.
1318 * Failing over to another node could help.
1320 p->states[rec->i] = JK_LB_STATE_ERROR;
1323 else if (service_stat == JK_STATUS_ERROR) {
1325 * Status code configured as service is down.
1327 * Do not try to reuse the same node for the same request.
1328 * Failing over to another node could help.
1330 rec->s->state = JK_LB_STATE_OK;
1331 p->states[rec->i] = JK_LB_STATE_ERROR;
1332 rec->s->error_time = 0;
1335 else if (service_stat == JK_STATUS_FATAL_ERROR) {
1337 * Status code configured as service is down.
1338 * Mark the node as bad.
1339 * Do not try to reuse the same node for the same request.
1340 * Failing over to another node could help.
1343 rec->s->state = JK_LB_STATE_ERROR;
1344 p->states[rec->i] = JK_LB_STATE_ERROR;
1345 rec->s->error_time = time(NULL);
1348 else if (service_stat == JK_REPLY_TIMEOUT) {
1349 if (aw->s->reply_timeouts > (unsigned)p->worker->max_reply_timeouts) {
1351 * Service failed - to many reply timeouts
1352 * Mark the node as bad.
1353 * Do not try to reuse the same node for the same request.
1354 * Failing over to another node could help.
1357 rec->s->state = JK_LB_STATE_ERROR;
1358 p->states[rec->i] = JK_LB_STATE_ERROR;
1359 rec->s->error_time = time(NULL);
1363 * Reply timeout, bot not yet too many of them.
1364 * Keep previous global state.
1365 * Do not try to reuse the same node for the same request.
1366 * Failing over to another node could help.
1368 p->states[rec->i] = JK_LB_STATE_ERROR;
1374 * Various unspecific error cases.
1375 * Keep previous global state, if we are not in local error since to long.
1376 * Do not try to reuse the same node for the same request.
1377 * Failing over to another node could help.
1379 time_t now = time(NULL);
1381 if (rec->s->busy == 0 ||
1382 p->worker->error_escalation_time == 0 ||
1383 (rec->s->error_time > 0 &&
1384 (int)difftime(now, rec->s->error_time) >= p->worker->error_escalation_time)) {
1385 if (JK_IS_DEBUG_LEVEL(l))
1386 jk_log(l, JK_LOG_DEBUG,
1387 "worker %s escalating local error to global error",
1389 rec->s->state = JK_LB_STATE_ERROR;
1391 p->states[rec->i] = JK_LB_STATE_ERROR;
1392 if (rec->s->error_time == 0) {
1393 rec->s->error_time = now;
1397 if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)
1399 if (p->states[rec->i] == JK_LB_STATE_ERROR)
1400 jk_log(l, JK_LOG_INFO,
1401 "service failed, worker %s is in %serror state",
1403 rec->s->state == JK_LB_STATE_ERROR ? "" : "local ");
1405 if (recoverable == JK_TRUE) {
1407 * Error is recoverable by submitting the request to
1408 * another worker... Lets try to do that.
1410 if (JK_IS_DEBUG_LEVEL(l))
1411 jk_log(l, JK_LOG_DEBUG,
1412 "recoverable error... will try to recover on other worker");
1416 * Error is not recoverable - break with an error.
1418 if (rc == JK_CLIENT_ERROR)
1419 jk_log(l, JK_LOG_INFO,
1420 "unrecoverable error %d, request failed."
1421 " Client failed in the middle of request,"
1422 " we can't recover to another instance.",
1424 else if (rc != JK_TRUE)
1425 jk_log(l, JK_LOG_ERROR,
1426 "unrecoverable error %d, request failed."
1427 " Tomcat failed in the middle of request,"
1428 " we can't recover to another instance.",
1431 if (first == 1 && s->add_log_items) {
1433 lb_add_log_items(s, lb_first_log_names, prec, l);
1437 /* No more workers left ... */
1440 /* Force recovery only once.
1441 * If it still fails, Tomcat is still disconnected.
1444 nf = force_recovery(p->worker, p->states, l);
1448 /* We have forced recovery.
1449 * Reset the service loop and go again
1452 jk_log(l, JK_LOG_INFO,
1453 "Forcing recovery once for %d workers", nf);
1457 /* No workers in error state.
1458 * Somebody set them all to disabled?
1460 jk_log(l, JK_LOG_INFO,
1461 "All tomcat instances failed, no more workers "
1462 "left for recovery (attempt=%d, retry=%d)",
1464 *is_error = JK_HTTP_SERVER_BUSY;
1469 jk_log(l, JK_LOG_INFO,
1470 "All tomcat instances failed, no more workers "
1471 "left (attempt=%d, retry=%d)",
1473 *is_error = JK_HTTP_SERVER_BUSY;
1479 if (recoverable == JK_TRUE) {
1480 jk_log(l, JK_LOG_INFO,
1481 "All tomcat instances are busy or in error state");
1482 /* rc and http error must be set above */
1484 if (rc == JK_FALSE) {
1485 jk_log(l, JK_LOG_ERROR,
1486 "All tomcat instances failed, no more workers left");
1488 if (prec && s->add_log_items) {
1489 lb_add_log_items(s, lb_last_log_names, prec, l);
1496 static int JK_METHOD done(jk_endpoint_t **e, jk_logger_t *l)
1500 if (e && *e && (*e)->endpoint_private) {
1501 lb_endpoint_t *p = (*e)->endpoint_private;
1509 JK_LOG_NULL_PARAMS(l);
1514 static int JK_METHOD validate(jk_worker_t *pThis,
1516 jk_worker_env_t *we, jk_logger_t *l)
1520 if (pThis && pThis->worker_private) {
1521 lb_worker_t *p = pThis->worker_private;
1522 char **worker_names;
1523 unsigned int num_of_workers;
1526 p->sticky_session = jk_get_is_sticky_session(props, p->name);
1527 p->sticky_session_force = jk_get_is_sticky_session_force(props, p->name);
1528 secret = jk_get_worker_secret(props, p->name);
1530 if (jk_get_lb_worker_list(props,
1533 &num_of_workers) && num_of_workers) {
1536 p->max_packet_size = DEF_BUFFER_SZ;
1537 p->lb_workers = jk_pool_alloc(&p->p,
1539 sizeof(lb_sub_worker_t));
1540 if (!p->lb_workers) {
1544 memset(p->lb_workers, 0, num_of_workers * sizeof(lb_sub_worker_t));
1545 for (i = 0; i < num_of_workers; i++) {
1546 p->lb_workers[i].s = jk_shm_alloc_lb_sub_worker(&p->p);
1547 if (p->lb_workers[i].s == NULL) {
1548 jk_log(l, JK_LOG_ERROR,
1549 "allocating lb sub worker record from shared memory");
1555 for (i = 0; i < num_of_workers; i++) {
1559 p->lb_workers[i].i = i;
1560 strncpy(p->lb_workers[i].name, worker_names[i],
1562 strncpy(p->lb_workers[i].s->h.name, worker_names[i],
1564 p->lb_workers[i].sequence = 0;
1565 p->lb_workers[i].s->h.sequence = 0;
1566 p->lb_workers[i].lb_factor =
1567 jk_get_lb_factor(props, worker_names[i]);
1568 if (p->lb_workers[i].lb_factor < 1) {
1569 p->lb_workers[i].lb_factor = 1;
1571 /* Calculate the maximum packet size from all workers
1572 * for the recovery buffer.
1574 ms = jk_get_max_packet_size(props, worker_names[i]);
1575 if (ms > p->max_packet_size)
1576 p->max_packet_size = ms;
1577 p->lb_workers[i].distance =
1578 jk_get_distance(props, worker_names[i]);
1579 if ((s = jk_get_worker_route(props, worker_names[i], NULL)))
1580 strncpy(p->lb_workers[i].route, s, JK_SHM_STR_SIZ);
1582 strncpy(p->lb_workers[i].route, worker_names[i], JK_SHM_STR_SIZ);
1583 if ((s = jk_get_worker_domain(props, worker_names[i], NULL)))
1584 strncpy(p->lb_workers[i].domain, s, JK_SHM_STR_SIZ);
1585 if ((s = jk_get_worker_redirect(props, worker_names[i], NULL)))
1586 strncpy(p->lb_workers[i].redirect, s, JK_SHM_STR_SIZ);
1588 p->lb_workers[i].s->lb_value = 0;
1589 p->lb_workers[i].s->state = JK_LB_STATE_IDLE;
1590 p->lb_workers[i].s->error_time = 0;
1591 p->lb_workers[i].activation =
1592 jk_get_worker_activation(props, worker_names[i]);
1593 if (!wc_create_worker(p->lb_workers[i].name, 0,
1595 &(p->lb_workers[i].worker),
1596 we, l) || !p->lb_workers[i].worker) {
1599 if (secret && (p->lb_workers[i].worker->type == JK_AJP13_WORKER_TYPE ||
1600 p->lb_workers[i].worker->type == JK_AJP14_WORKER_TYPE)) {
1601 ajp_worker_t *aw = (ajp_worker_t *)p->lb_workers[i].worker->worker_private;
1603 aw->secret = secret;
1605 if (p->lb_workers[i].worker->type == JK_AJP13_WORKER_TYPE ||
1606 p->lb_workers[i].worker->type == JK_AJP14_WORKER_TYPE) {
1607 ajp_worker_t *aw = (ajp_worker_t *)p->lb_workers[i].worker->worker_private;
1608 if (aw->port == 0) {
1609 p->lb_workers[i].activation = JK_LB_ACTIVATION_STOPPED;
1614 if (i != num_of_workers) {
1615 jk_log(l, JK_LOG_ERROR,
1616 "Failed creating worker %s",
1617 p->lb_workers[i].name);
1618 close_workers(p, i, l);
1621 /* Update domain names if route contains period '.' */
1622 for (i = 0; i < num_of_workers; i++) {
1623 if (!p->lb_workers[i].domain[0]) {
1624 char *id_domain = strchr(p->lb_workers[i].route, '.');
1627 strcpy(p->lb_workers[i].domain, p->lb_workers[i].route);
1631 if (JK_IS_DEBUG_LEVEL(l)) {
1632 jk_log(l, JK_LOG_DEBUG,
1633 "Balanced worker %i has name %s and route %s in domain %s",
1635 p->lb_workers[i].name,
1636 p->lb_workers[i].route,
1637 p->lb_workers[i].domain);
1640 p->num_of_workers = num_of_workers;
1642 for (i = 0; i < num_of_workers; i++) {
1643 for (j = 0; j < i; j++) {
1644 if (strcmp(p->lb_workers[i].route, p->lb_workers[j].route) == 0) {
1645 jk_log(l, JK_LOG_ERROR,
1646 "Balanced workers number %i (%s) and %i (%s) share the same route %s - aborting configuration!",
1648 p->lb_workers[i].name,
1650 p->lb_workers[j].name,
1651 p->lb_workers[i].route);
1663 JK_LOG_NULL_PARAMS(l);
1668 static int JK_METHOD init(jk_worker_t *pThis,
1670 jk_worker_env_t *we, jk_logger_t *log)
1674 lb_worker_t *p = (lb_worker_t *)pThis->worker_private;
1675 JK_TRACE_ENTER(log);
1678 p->retries = jk_get_worker_retries(props, p->name,
1681 jk_get_worker_retry_interval(props, p->name,
1683 p->recover_wait_time = jk_get_worker_recover_timeout(props, p->name,
1684 WAIT_BEFORE_RECOVER);
1685 if (p->recover_wait_time < 1)
1686 p->recover_wait_time = 1;
1687 p->error_escalation_time = jk_get_worker_error_escalation_time(props, p->name,
1688 p->recover_wait_time / 2);
1689 p->max_reply_timeouts = jk_get_worker_max_reply_timeouts(props, p->name,
1691 p->maintain_time = jk_get_worker_maintain_time(props);
1692 if(p->maintain_time < 0)
1693 p->maintain_time = 0;
1694 p->s->last_maintain_time = time(NULL);
1695 p->s->last_reset = p->s->last_maintain_time;
1697 p->lbmethod = jk_get_lb_method(props, p->name);
1698 p->lblock = jk_get_lb_lock(props, p->name);
1699 strncpy(p->session_cookie,
1700 jk_get_lb_session_cookie(props, p->name, JK_SESSION_IDENTIFIER),
1702 strncpy(p->session_path,
1703 jk_get_lb_session_path(props, p->name, JK_PATH_SESSION_IDENTIFIER),
1705 strcpy(p->s->session_cookie, p->session_cookie);
1706 strcpy(p->s->session_path, p->session_path);
1708 JK_INIT_CS(&(p->cs), i);
1709 if (i == JK_FALSE) {
1710 jk_log(log, JK_LOG_ERROR,
1711 "creating thread lock (errno=%d)",
1718 jk_lb_push(p, JK_FALSE, log);
1724 static int JK_METHOD get_endpoint(jk_worker_t *pThis,
1725 jk_endpoint_t **pend, jk_logger_t *l)
1729 if (pThis && pThis->worker_private && pend) {
1730 lb_endpoint_t *p = (lb_endpoint_t *) malloc(sizeof(lb_endpoint_t));
1731 p->worker = pThis->worker_private;
1732 p->endpoint.endpoint_private = p;
1733 p->endpoint.service = service;
1734 p->endpoint.done = done;
1735 p->states = (int *)malloc((p->worker->num_of_workers + 1) * sizeof(int));
1738 jk_log(l, JK_LOG_ERROR,
1739 "Failed allocating private worker state memory");
1743 *pend = &p->endpoint;
1748 JK_LOG_NULL_PARAMS(l);
1755 static int JK_METHOD destroy(jk_worker_t **pThis, jk_logger_t *l)
1759 if (pThis && *pThis && (*pThis)->worker_private) {
1761 lb_worker_t *private_data = (*pThis)->worker_private;
1763 close_workers(private_data, private_data->num_of_workers, l);
1764 JK_DELETE_CS(&(private_data->cs), i);
1765 jk_close_pool(&private_data->p);
1772 JK_LOG_NULL_PARAMS(l);
1777 int JK_METHOD lb_worker_factory(jk_worker_t **w,
1778 const char *name, jk_logger_t *l)
1782 if (NULL != name && NULL != w) {
1783 lb_worker_t *private_data =
1784 (lb_worker_t *) calloc(1, sizeof(lb_worker_t));
1787 jk_open_pool(&private_data->p,
1789 sizeof(jk_pool_atom_t) * TINY_POOL_SIZE);
1791 private_data->s = jk_shm_alloc_lb_worker(&private_data->p);
1792 if (!private_data->s) {
1797 strncpy(private_data->name, name, JK_SHM_STR_SIZ);
1798 strncpy(private_data->s->h.name, name, JK_SHM_STR_SIZ);
1799 private_data->lb_workers = NULL;
1800 private_data->num_of_workers = 0;
1801 private_data->worker.worker_private = private_data;
1802 private_data->worker.validate = validate;
1803 private_data->worker.init = init;
1804 private_data->worker.get_endpoint = get_endpoint;
1805 private_data->worker.destroy = destroy;
1806 private_data->worker.maintain = maintain_workers;
1807 private_data->recover_wait_time = WAIT_BEFORE_RECOVER;
1808 private_data->error_escalation_time = private_data->recover_wait_time / 2;
1809 private_data->max_reply_timeouts = 0;
1810 private_data->sequence = 0;
1811 private_data->s->h.sequence = 0;
1812 private_data->next_offset = 0;
1813 *w = &private_data->worker;
1815 return JK_LB_WORKER_TYPE;
1818 JK_LOG_NULL_PARAMS(l);