X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=rubbos%2Fapp%2Ftomcat-connectors-1.2.32-src%2Fnative%2Fcommon%2Fjk_ajp_common.c;fp=rubbos%2Fapp%2Ftomcat-connectors-1.2.32-src%2Fnative%2Fcommon%2Fjk_ajp_common.c;h=7b0832641536939176e010a6b7c4685461ef56aa;hb=9401f816dd0d9d550fe98a8507224bde51c4b847;hp=0000000000000000000000000000000000000000;hpb=e8ec7aa8e38a93f5b034ac74cebce5de23710317;p=bottlenecks.git diff --git a/rubbos/app/tomcat-connectors-1.2.32-src/native/common/jk_ajp_common.c b/rubbos/app/tomcat-connectors-1.2.32-src/native/common/jk_ajp_common.c new file mode 100644 index 00000000..7b083264 --- /dev/null +++ b/rubbos/app/tomcat-connectors-1.2.32-src/native/common/jk_ajp_common.c @@ -0,0 +1,3383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*************************************************************************** + * Description: common stuff for bi-directional protocols ajp13/ajp14. * + * Author: Gal Shachor * + * Author: Henri Gomez * + * Version: $Revision: 1137200 $ * + ***************************************************************************/ + + +#include "jk_global.h" +#include "jk_util.h" +#include "jk_ajp13.h" +#include "jk_ajp14.h" +#include "jk_ajp_common.h" +#include "jk_connect.h" +#if defined(AS400) && !defined(AS400_UTF8) +#include "util_ebcdic.h" +#endif +#if defined(NETWARE) && defined(__NOVELL_LIBC__) +#include "novsock2.h" +#endif + +const char *response_trans_headers[] = { + "Content-Type", + "Content-Language", + "Content-Length", + "Date", + "Last-Modified", + "Location", + "Set-Cookie", + "Set-Cookie2", + "Servlet-Engine", + "Status", + "WWW-Authenticate" +}; + +static const char *long_res_header_for_sc(int sc) +{ + const char *rc = NULL; + sc = sc & 0X00FF; + if (sc <= SC_RES_HEADERS_NUM && sc > 0) { + rc = response_trans_headers[sc - 1]; + } + + return rc; +} + +static const char *ajp_state_type[] = { + JK_AJP_STATE_TEXT_IDLE, + JK_AJP_STATE_TEXT_OK, + JK_AJP_STATE_TEXT_ERROR, + JK_AJP_STATE_TEXT_PROBE, + "unknown", + NULL +}; + +#define UNKNOWN_METHOD (-1) + +static int sc_for_req_method(const char *method, size_t len) +{ + /* Note: the following code was generated by the "shilka" tool from + the "cocom" parsing/compilation toolkit. It is an optimized lookup + based on analysis of the input keywords. Postprocessing was done + on the shilka output, but the basic structure and analysis is + from there. Should new HTTP methods be added, then manual insertion + into this code is fine, or simply re-running the shilka tool on + the appropriate input. */ + + /* Note: it is also quite reasonable to just use our method_registry, + but I'm assuming (probably incorrectly) we want more speed here + (based on the optimizations the previous code was doing). */ + + switch (len) + { + case 3: + switch (method[0]) + { + case 'A': + return (method[1] == 'C' + && method[2] == 'L' + ? SC_M_ACL : UNKNOWN_METHOD); + case 'P': + return (method[1] == 'U' + && method[2] == 'T' + ? SC_M_PUT : UNKNOWN_METHOD); + case 'G': + return (method[1] == 'E' + && method[2] == 'T' + ? SC_M_GET : UNKNOWN_METHOD); + default: + return UNKNOWN_METHOD; + } + + case 4: + switch (method[0]) + { + case 'H': + return (method[1] == 'E' + && method[2] == 'A' + && method[3] == 'D' + ? SC_M_HEAD : UNKNOWN_METHOD); + case 'P': + return (method[1] == 'O' + && method[2] == 'S' + && method[3] == 'T' + ? SC_M_POST : UNKNOWN_METHOD); + case 'M': + return (method[1] == 'O' + && method[2] == 'V' + && method[3] == 'E' + ? SC_M_MOVE : UNKNOWN_METHOD); + case 'L': + return (method[1] == 'O' + && method[2] == 'C' + && method[3] == 'K' + ? SC_M_LOCK : UNKNOWN_METHOD); + case 'C': + return (method[1] == 'O' + && method[2] == 'P' + && method[3] == 'Y' + ? SC_M_COPY : UNKNOWN_METHOD); + default: + return UNKNOWN_METHOD; + } + + case 5: + switch (method[2]) + { + case 'R': + return (memcmp(method, "MERGE", 5) == 0 + ? SC_M_MERGE : UNKNOWN_METHOD); + case 'C': + return (memcmp(method, "MKCOL", 5) == 0 + ? SC_M_MKCOL : UNKNOWN_METHOD); + case 'B': + return (memcmp(method, "LABEL", 5) == 0 + ? SC_M_LABEL : UNKNOWN_METHOD); + case 'A': + return (memcmp(method, "TRACE", 5) == 0 + ? SC_M_TRACE : UNKNOWN_METHOD); + default: + return UNKNOWN_METHOD; + } + + case 6: + switch (method[0]) + { + case 'U': + switch (method[5]) + { + case 'K': + return (memcmp(method, "UNLOCK", 6) == 0 + ? SC_M_UNLOCK : UNKNOWN_METHOD); + case 'E': + return (memcmp(method, "UPDATE", 6) == 0 + ? SC_M_UPDATE : UNKNOWN_METHOD); + default: + return UNKNOWN_METHOD; + } + case 'R': + return (memcmp(method, "REPORT", 6) == 0 + ? SC_M_REPORT : UNKNOWN_METHOD); + case 'S': + return (memcmp(method, "SEARCH", 6) == 0 + ? SC_M_SEARCH : UNKNOWN_METHOD); + case 'D': + return (memcmp(method, "DELETE", 6) == 0 + ? SC_M_DELETE : UNKNOWN_METHOD); + default: + return UNKNOWN_METHOD; + } + + case 7: + switch (method[1]) + { + case 'P': + return (memcmp(method, "OPTIONS", 7) == 0 + ? SC_M_OPTIONS : UNKNOWN_METHOD); + case 'H': + return (memcmp(method, "CHECKIN", 7) == 0 + ? SC_M_CHECKIN : UNKNOWN_METHOD); + default: + return UNKNOWN_METHOD; + } + + case 8: + switch (method[0]) + { + case 'P': + return (memcmp(method, "PROPFIND", 8) == 0 + ? SC_M_PROPFIND : UNKNOWN_METHOD); + case 'C': + return (memcmp(method, "CHECKOUT", 8) == 0 + ? SC_M_CHECKOUT : UNKNOWN_METHOD); + default: + return UNKNOWN_METHOD; + } + + case 9: + return (memcmp(method, "PROPPATCH", 9) == 0 + ? SC_M_PROPPATCH : UNKNOWN_METHOD); + + case 10: + switch (method[0]) + { + case 'U': + return (memcmp(method, "UNCHECKOUT", 10) == 0 + ? SC_M_UNCHECKOUT : UNKNOWN_METHOD); + case 'M': + return (memcmp(method, "MKACTIVITY", 10) == 0 + ? SC_M_MKACTIVITY : UNKNOWN_METHOD); + default: + return UNKNOWN_METHOD; + } + + case 11: + return (memcmp(method, "MKWORKSPACE", 11) == 0 + ? SC_M_MKWORKSPACE : UNKNOWN_METHOD); + + case 15: + return (memcmp(method, "VERSION-CONTROL", 15) == 0 + ? SC_M_VERSION_CONTROL : UNKNOWN_METHOD); + + case 16: + return (memcmp(method, "BASELINE-CONTROL", 16) == 0 + ? SC_M_BASELINE_CONTROL : UNKNOWN_METHOD); + + default: + return UNKNOWN_METHOD; + } + + /* NOTREACHED */ +} + +static int sc_for_req_header(const char *header_name) +{ + char header[16]; + size_t len = strlen(header_name); + const char *p = header_name; + int i = 0; + + /* ACCEPT-LANGUAGE is the longest header + * that is of interest. + */ + if (len < 4 || len > 15) + return UNKNOWN_METHOD; + + while (*p) { + header[i++] = toupper((unsigned char)*p); + p++; + } + + header[i] = '\0'; + p = &header[1]; + +/* Always do memcmp including the final \0-termination character. + */ + switch (header[0]) { + case 'A': + if (memcmp(p, "CCEPT", 6) == 0) { + if (!header[6]) + return SC_ACCEPT; + else if (header[6] == '-') { + p += 6; + if (memcmp(p, "CHARSET", 8) == 0) + return SC_ACCEPT_CHARSET; + else if (memcmp(p, "ENCODING", 9) == 0) + return SC_ACCEPT_ENCODING; + else if (memcmp(p, "LANGUAGE", 9) == 0) + return SC_ACCEPT_LANGUAGE; + else + return UNKNOWN_METHOD; + } + else + return UNKNOWN_METHOD; + } + else if (memcmp(p, "UTHORIZATION", 13) == 0) + return SC_AUTHORIZATION; + else + return UNKNOWN_METHOD; + break; + case 'C': + if(memcmp(p, "OOKIE2", 7) == 0) + return SC_COOKIE2; + else if (memcmp(p, "OOKIE", 6) == 0) + return SC_COOKIE; + else if(memcmp(p, "ONNECTION", 10) == 0) + return SC_CONNECTION; + else if(memcmp(p, "ONTENT-TYPE", 12) == 0) + return SC_CONTENT_TYPE; + else if(memcmp(p, "ONTENT-LENGTH", 14) == 0) + return SC_CONTENT_LENGTH; + else + return UNKNOWN_METHOD; + break; + case 'H': + if(memcmp(p, "OST", 4) == 0) + return SC_HOST; + else + return UNKNOWN_METHOD; + break; + case 'P': + if(memcmp(p, "RAGMA", 6) == 0) + return SC_PRAGMA; + else + return UNKNOWN_METHOD; + break; + case 'R': + if(memcmp(p, "EFERER", 7) == 0) + return SC_REFERER; + else + return UNKNOWN_METHOD; + break; + case 'U': + if(memcmp(p, "SER-AGENT", 10) == 0) + return SC_USER_AGENT; + else + return UNKNOWN_METHOD; + break; + default: + return UNKNOWN_METHOD; + } + /* NOTREACHED */ +} + +/* Return the string representation of the worker state */ +const char *jk_ajp_get_state(ajp_worker_t *aw, jk_logger_t *l) +{ + return ajp_state_type[aw->s->state]; +} + +/* Return the int representation of the worker state */ +int jk_ajp_get_state_code(const char *v) +{ + if (!v) + return JK_AJP_STATE_DEF; + else if (*v == 'i' || *v == 'I' || *v == 'n' || *v == 'N' || *v == '0') + return JK_AJP_STATE_IDLE; + else if (*v == 'o' || *v == 'O' || *v == '1') + return JK_AJP_STATE_OK; + else if (*v == 'e' || *v == 'E' || *v == '4') + return JK_AJP_STATE_ERROR; + else if (*v == 'p' || *v == 'P' || *v == '6') + return JK_AJP_STATE_PROBE; + else + return JK_AJP_STATE_DEF; +} + +int jk_ajp_get_cping_mode(const char *m, int def) +{ + int mv = def; + if (!m) + return mv; + while (*m != '\0') { + if (*m == 'C' || *m == 'c') + mv |= AJP_CPING_CONNECT; + else if (*m == 'P' || *m == 'p') + mv |= AJP_CPING_PREPOST; + else if (*m == 'I' || *m == 'i') + mv |= AJP_CPING_INTERVAL; + else if (*m == 'A' || *m == 'a') { + mv = AJP_CPING_CONNECT | AJP_CPING_PREPOST | AJP_CPING_INTERVAL; + break; + } + m++; + } + return mv; +} + +/* + * Message structure + * + * +AJPV13_REQUEST/AJPV14_REQUEST= + request_prefix (1) (byte) + method (byte) + protocol (string) + req_uri (string) + remote_addr (string) + remote_host (string) + server_name (string) + server_port (short) + is_ssl (boolean) + num_headers (short) + num_headers*(req_header_name header_value) + + ?context (byte)(string) + ?servlet_path (byte)(string) + ?remote_user (byte)(string) + ?auth_type (byte)(string) + ?query_string (byte)(string) + ?route (byte)(string) + ?ssl_cert (byte)(string) + ?ssl_cipher (byte)(string) + ?ssl_session (byte)(string) + ?ssl_key_size (byte)(int) via JkOptions +ForwardKeySize + request_terminator (byte) + ?body content_length*(var binary) + + */ + +static int ajp_marshal_into_msgb(jk_msg_buf_t *msg, + jk_ws_service_t *s, + jk_logger_t *l, ajp_endpoint_t * ae) +{ + int method; + unsigned int i; + + JK_TRACE_ENTER(l); + + if ((method = sc_for_req_method(s->method, + strlen(s->method))) == UNKNOWN_METHOD) + method = SC_M_JK_STORED; + + if (jk_b_append_byte(msg, JK_AJP13_FORWARD_REQUEST) || + jk_b_append_byte(msg, (unsigned char)method) || + jk_b_append_string(msg, s->protocol) || + jk_b_append_string(msg, s->req_uri) || + jk_b_append_string(msg, s->remote_addr) || + jk_b_append_string(msg, s->remote_host) || + jk_b_append_string(msg, s->server_name) || + jk_b_append_int(msg, (unsigned short)s->server_port) || + jk_b_append_byte(msg, (unsigned char)(s->is_ssl)) || + jk_b_append_int(msg, (unsigned short)(s->num_headers))) { + + jk_log(l, JK_LOG_ERROR, + "failed appending the message begining"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + for (i = 0; i < s->num_headers; i++) { + int sc; + + if ((sc = sc_for_req_header(s->headers_names[i])) != UNKNOWN_METHOD) { + if (jk_b_append_int(msg, (unsigned short)sc)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the header name"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + else { + if (jk_b_append_string(msg, s->headers_names[i])) { + jk_log(l, JK_LOG_ERROR, + "failed appending the header name"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + + if (jk_b_append_string(msg, s->headers_values[i])) { + jk_log(l, JK_LOG_ERROR, + "failed appending the header value"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + + if (s->secret) { + if (jk_b_append_byte(msg, SC_A_SECRET) || + jk_b_append_string(msg, s->secret)) { + jk_log(l, JK_LOG_ERROR, + "failed appending secret"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + + if (s->remote_user) { + if (jk_b_append_byte(msg, SC_A_REMOTE_USER) || + jk_b_append_string(msg, s->remote_user)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the remote user"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + if (s->auth_type) { + if (jk_b_append_byte(msg, SC_A_AUTH_TYPE) || + jk_b_append_string(msg, s->auth_type)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the auth type"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + if (s->query_string) { + if (jk_b_append_byte(msg, SC_A_QUERY_STRING) || +#if defined(AS400) && !defined(AS400_UTF8) + jk_b_append_asciistring(msg, s->query_string)) { +#else + jk_b_append_string(msg, s->query_string)) { +#endif + jk_log(l, JK_LOG_ERROR, + "failed appending the query string"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + if (s->route) { + if (jk_b_append_byte(msg, SC_A_ROUTE) || + jk_b_append_string(msg, s->route)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the route"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + if (s->ssl_cert_len) { + if (jk_b_append_byte(msg, SC_A_SSL_CERT) || + jk_b_append_string(msg, s->ssl_cert)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the SSL certificates"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + + if (s->ssl_cipher) { + if (jk_b_append_byte(msg, SC_A_SSL_CIPHER) || + jk_b_append_string(msg, s->ssl_cipher)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the SSL ciphers"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + if (s->ssl_session) { + if (jk_b_append_byte(msg, SC_A_SSL_SESSION) || + jk_b_append_string(msg, s->ssl_session)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the SSL session"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + + /* + * ssl_key_size is required by Servlet 2.3 API + * added support only in ajp14 mode + * JFC removed: ae->proto == AJP14_PROTO + */ + if (s->ssl_key_size != -1) { + if (jk_b_append_byte(msg, SC_A_SSL_KEY_SIZE) || + jk_b_append_int(msg, (unsigned short)s->ssl_key_size)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the SSL key size"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + + /* If the method was unrecognized, encode it as an attribute */ + if (method == SC_M_JK_STORED) { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, "unknown method %s", s->method); + if (jk_b_append_byte(msg, SC_A_STORED_METHOD) || + jk_b_append_string(msg, s->method)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the request method"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + + /* Forward the remote port information, which was forgotten + * from the builtin data of the AJP 13 protocol. + * Since the servlet spec allows to retrieve it via getRemotePort(), + * we provide the port to the Tomcat connector as a request + * attribute. Modern Tomcat versions know how to retrieve + * the remote port from this attribute. + */ + { + if (jk_b_append_byte(msg, SC_A_REQ_ATTRIBUTE) || + jk_b_append_string(msg, SC_A_REQ_REMOTE_PORT) || + jk_b_append_string(msg, s->remote_port)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the remote port %s", + s->remote_port); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + + /* Forward activation information from the load balancer. + * It can be used by the backend to deny access by requests, + * which come with a session id but for an invalid session. + * Such requests get forwarded to backends even if they + * are disabled" in the load balancer, because the balancer + * does not know, which sessions are valid. + * If the backend can check, that is was "disabled" it can + * delete the session cookie and respond with a self-referential + * redirect. The new request will then be balanced to some + * other node that is not disabled. + */ + { + if (jk_b_append_byte(msg, SC_A_REQ_ATTRIBUTE) || + jk_b_append_string(msg, SC_A_JK_LB_ACTIVATION) || + jk_b_append_string(msg, s->activation)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the activation state %s", + s->activation); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + + if (s->num_attributes > 0) { + for (i = 0; i < s->num_attributes; i++) { + if (jk_b_append_byte(msg, SC_A_REQ_ATTRIBUTE) || + jk_b_append_string(msg, s->attributes_names[i]) || + jk_b_append_string(msg, s->attributes_values[i])) { + jk_log(l, JK_LOG_ERROR, + "failed appending attribute %s=%s", + s->attributes_names[i], s->attributes_values[i]); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + } + + if (jk_b_append_byte(msg, SC_A_ARE_DONE)) { + jk_log(l, JK_LOG_ERROR, + "failed appending the message end"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, "ajp marshaling done"); + JK_TRACE_EXIT(l); + return JK_TRUE; +} + +/* +AJPV13_RESPONSE/AJPV14_RESPONSE:= + response_prefix (2) + status (short) + status_msg (short) + num_headers (short) + num_headers*(res_header_name header_value) + *body_chunk + terminator boolean + +req_header_name := + sc_req_header_name | (string) + +res_header_name := + sc_res_header_name | (string) + +header_value := + (string) + +body_chunk := + length (short) + body length*(var binary) + + */ + + +static int ajp_unmarshal_response(jk_msg_buf_t *msg, + jk_res_data_t * d, + ajp_endpoint_t * ae, jk_logger_t *l) +{ + jk_pool_t *p = &ae->pool; + + JK_TRACE_ENTER(l); + + d->status = jk_b_get_int(msg); + if (!d->status) { + jk_log(l, JK_LOG_ERROR, + "NULL status"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + d->msg = (char *)jk_b_get_string(msg); + if (d->msg) { +#if (defined(AS400) && !defined(AS400_UTF8)) || defined(_OSD_POSIX) + jk_xlate_from_ascii(d->msg, strlen(d->msg)); +#endif + } + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "status = %d", d->status); + + d->num_headers = jk_b_get_int(msg); + d->header_names = d->header_values = NULL; + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "Number of headers is = %d", + d->num_headers); + + if (d->num_headers) { + d->header_names = jk_pool_alloc(p, sizeof(char *) * d->num_headers); + d->header_values = jk_pool_alloc(p, sizeof(char *) * d->num_headers); + + if (d->header_names && d->header_values) { + unsigned int i; + for (i = 0; i < d->num_headers; i++) { + unsigned short name = jk_b_pget_int(msg, msg->pos); + + if ((name & 0XFF00) == 0XA000) { + jk_b_get_int(msg); + name = name & 0X00FF; + if (name <= SC_RES_HEADERS_NUM) { + d->header_names[i] = + (char *)long_res_header_for_sc(name); + } + else { + jk_log(l, JK_LOG_ERROR, + "No such sc (%d)", name); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + else { + d->header_names[i] = (char *)jk_b_get_string(msg); + if (!d->header_names[i]) { + jk_log(l, JK_LOG_ERROR, + "NULL header name"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } +#if (defined(AS400) && !defined(AS400_UTF8)) || defined(_OSD_POSIX) + jk_xlate_from_ascii(d->header_names[i], + strlen(d->header_names[i])); +#endif + + } + + d->header_values[i] = (char *)jk_b_get_string(msg); + if (!d->header_values[i]) { + jk_log(l, JK_LOG_ERROR, + "NULL header value"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + +#if (defined(AS400) && !defined(AS400_UTF8)) || defined(_OSD_POSIX) + jk_xlate_from_ascii(d->header_values[i], + strlen(d->header_values[i])); +#endif + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "Header[%d] [%s] = [%s]", + i, d->header_names[i], d->header_values[i]); + } + } + } + + JK_TRACE_EXIT(l); + return JK_TRUE; +} + +/* + * Abort endpoint use + */ +static void ajp_abort_endpoint(ajp_endpoint_t * ae, int shutdown, jk_logger_t *l) +{ + JK_TRACE_ENTER(l); + if (shutdown == JK_TRUE && IS_VALID_SOCKET(ae->sd)) { + if (ae->hard_close) { + /* Force unclean connection close to communicate client write errors + * back to Tomcat by aborting AJP response writes. + */ + jk_close_socket(ae->sd, l); + } + else { + jk_shutdown_socket(ae->sd, l); + } + } + ae->worker->s->connected--; + ae->sd = JK_INVALID_SOCKET; + ae->last_op = JK_AJP13_END_RESPONSE; + JK_TRACE_EXIT(l); +} + +/* + * Reset the endpoint (clean buf and close socket) + */ +static void ajp_reset_endpoint(ajp_endpoint_t * ae, jk_logger_t *l) +{ + JK_TRACE_ENTER(l); + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "(%s) resetting endpoint with socket %d%s", + ae->worker->name, ae->sd, ae->reuse? "" : " (socket shutdown)"); + if (!ae->reuse) { + ajp_abort_endpoint(ae, JK_TRUE, l); + } + jk_reset_pool(&(ae->pool)); + JK_TRACE_EXIT(l); +} + +/* + * Close the endpoint (close pool and close socket) + */ +void ajp_close_endpoint(ajp_endpoint_t * ae, jk_logger_t *l) +{ + JK_TRACE_ENTER(l); + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "(%s) closing endpoint with socket %d%s", + ae->worker->name, ae->sd, ae->reuse ? "" : " (socket shutdown)"); + if (IS_VALID_SOCKET(ae->sd)) { + jk_shutdown_socket(ae->sd, l); + } + ae->sd = JK_INVALID_SOCKET; + jk_close_pool(&(ae->pool)); + free(ae); + JK_TRACE_EXIT(l); +} + + +/** Steal a connection from an idle cache endpoint + * @param ae endpoint that needs a new connection + * @param l logger + * @return JK_FALSE: failure + * JK_TRUE: success + * @remark Always closes old socket endpoint + */ +static int ajp_next_connection(ajp_endpoint_t *ae, jk_logger_t *l) +{ + int rc; + int ret = JK_FALSE; + ajp_worker_t *aw = ae->worker; + + JK_TRACE_ENTER(l); + + /* Close previous socket */ + if (IS_VALID_SOCKET(ae->sd)) + jk_shutdown_socket(ae->sd, l); + /* Mark existing endpoint socket as closed */ + ae->sd = JK_INVALID_SOCKET; + JK_ENTER_CS(&aw->cs, rc); + if (rc) { + unsigned int i; + for (i = 0; i < aw->ep_cache_sz; i++) { + /* Find cache slot with usable socket */ + if (aw->ep_cache[i] && IS_VALID_SOCKET(aw->ep_cache[i]->sd)) { + ae->sd = aw->ep_cache[i]->sd; + aw->ep_cache[i]->sd = JK_INVALID_SOCKET; + break; + } + } + JK_LEAVE_CS(&aw->cs, rc); + if (IS_VALID_SOCKET(ae->sd)) { + ret = JK_TRUE; + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "(%s) Will try pooled connection socket %d from slot %d", + ae->worker->name, ae->sd, i); + } + } + JK_TRACE_EXIT(l); + return ret; +} + +/** Handle the cping/cpong query + * @param ae endpoint + * @param timeout wait timeout in milliseconds + * @param l logger + * @return JK_FALSE: failure + * JK_TRUE: success + * @remark Always closes socket in case of + * a socket error + */ +static int ajp_handle_cping_cpong(ajp_endpoint_t * ae, int timeout, jk_logger_t *l) +{ + int i; + int cmd; + jk_msg_buf_t *msg; + + JK_TRACE_ENTER(l); + + ae->last_errno = 0; + msg = jk_b_new(&ae->pool); + if (!msg) { + jk_log(l, JK_LOG_ERROR, + "Failed allocating AJP message"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + if (jk_b_set_buffer_size(msg, 16)) { + jk_log(l, JK_LOG_ERROR, + "Failed allocating AJP message buffer"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + jk_b_reset(msg); + jk_b_append_byte(msg, AJP13_CPING_REQUEST); + + /* Send CPing query */ + if (ajp_connection_tcp_send_message(ae, msg, l) != JK_TRUE) { + jk_log(l, JK_LOG_INFO, + "can't send cping query"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + for (i = 0; i < 2; i++) { + /* wait for Pong reply for timeout milliseconds + */ + if (jk_is_input_event(ae->sd, timeout, l) == JK_FALSE) { + ae->last_errno = errno; + jk_log(l, JK_LOG_INFO, "timeout in reply cpong"); + /* We can't trust this connection any more. */ + ajp_abort_endpoint(ae, JK_TRUE, l); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + /* Read and check for Pong reply + */ + if (ajp_connection_tcp_get_message(ae, msg, l) != JK_TRUE) { + jk_log(l, JK_LOG_INFO, + "awaited reply cpong, not received"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + if ((cmd = jk_b_get_byte(msg)) != AJP13_CPONG_REPLY) { + /* If the respose was not CPONG it means that + * the previous response was not consumed by the + * client but the AJP messages was already in + * the network buffer. + * silently drop this single extra packet instead + * recycling the connection + */ + if (i || ae->last_op == JK_AJP13_END_RESPONSE || + cmd < JK_AJP13_SEND_BODY_CHUNK || + cmd > AJP13_CPONG_REPLY) { + jk_log(l, JK_LOG_WARNING, + "awaited reply cpong, received %d instead. " + "Closing connection", + cmd); + /* We can't trust this connection any more. */ + ajp_abort_endpoint(ae, JK_TRUE, l); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + else { + jk_log(l, JK_LOG_INFO, + "awaited reply cpong, received %d instead. " + "Retrying next packet", + cmd); + + } + } + else { + ae->last_op = AJP13_CPONG_REPLY; + /* We have received Pong reply */ + break; + } + } + JK_TRACE_EXIT(l); + return JK_TRUE; +} + +/** Connect an endpoint to a backend + * @param ae endpoint + * @param l logger + * @return JK_FALSE: failure + * JK_TRUE: success + * @remark Always closes socket in case of + * a socket error + * @remark Cares about ae->last_errno + */ +int ajp_connect_to_endpoint(ajp_endpoint_t * ae, jk_logger_t *l) +{ + char buf[32]; + int rc = JK_TRUE; + + JK_TRACE_ENTER(l); + + ae->last_errno = 0; + ae->sd = jk_open_socket(&ae->worker->worker_inet_addr, + ae->worker->keepalive, + ae->worker->socket_timeout, + ae->worker->socket_connect_timeout, + ae->worker->socket_buf, l); + + if (!IS_VALID_SOCKET(ae->sd)) { + ae->last_errno = errno; + jk_log(l, JK_LOG_INFO, + "Failed opening socket to (%s) (errno=%d)", + jk_dump_hinfo(&ae->worker->worker_inet_addr, buf), ae->last_errno); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + ae->worker->s->connected++; + /* set last_access only if needed */ + if (ae->worker->cache_timeout > 0) + ae->last_access = time(NULL); + /* Check if we must execute a logon after the physical connect */ + /* XXX: Not sure, if we really should do logon before cping/cpong */ + /* XXX: and if no cping/cpong is allowed before or after logon. */ + if (ae->worker->logon != NULL) { + rc = ae->worker->logon(ae, l); + if (rc == JK_FALSE) { + jk_log(l, JK_LOG_ERROR, + "(%s) ajp14 worker logon to the backend server failed", + ae->worker->name); + /* Close the socket if unable to logon */ + ajp_abort_endpoint(ae, JK_TRUE, l); + } + } + /* XXX: Should we send a cping also after logon to validate the connection? */ + else if (ae->worker->connect_timeout > 0) { + rc = ajp_handle_cping_cpong(ae, ae->worker->connect_timeout, l); + if (rc == JK_FALSE) + jk_log(l, JK_LOG_ERROR, + "(%s) cping/cpong after connecting to the backend server failed (errno=%d)", + ae->worker->name, ae->last_errno); + } + JK_TRACE_EXIT(l); + return rc; +} + +/* Syncing config values from shm */ +void jk_ajp_pull(ajp_worker_t * aw, int locked, jk_logger_t *l) +{ + int address_change = JK_FALSE; + int port = 0; + char host[JK_SHM_STR_SIZ+1]; + struct sockaddr_in inet_addr; + JK_TRACE_ENTER(l); + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "syncing mem for ajp worker '%s' from shm (%u -> %u) [%u->%u]", + aw->name, aw->sequence, aw->s->h.sequence, aw->addr_sequence, aw->s->addr_sequence); + if (locked == JK_FALSE) + jk_shm_lock(); + + aw->cache_timeout = aw->s->cache_timeout; + aw->connect_timeout = aw->s->connect_timeout; + aw->ping_timeout = aw->s->ping_timeout; + aw->reply_timeout = aw->s->reply_timeout; + aw->prepost_timeout = aw->s->prepost_timeout; + aw->recovery_opts = aw->s->recovery_opts; + aw->retries = aw->s->retries; + aw->retry_interval = aw->s->retry_interval; + aw->max_packet_size = aw->s->max_packet_size; + aw->sequence = aw->s->h.sequence; + if (aw->addr_sequence != aw->s->addr_sequence) { + address_change = JK_TRUE; + aw->addr_sequence = aw->s->addr_sequence; + strncpy(host, aw->s->host, JK_SHM_STR_SIZ); + port = aw->s->port; + } + if (locked == JK_FALSE) + jk_shm_unlock(); + + if (address_change == JK_TRUE) { + if (!jk_resolve(host, port, &inet_addr, + aw->worker.we->pool, l)) { + jk_log(l, JK_LOG_ERROR, + "Failed resolving address '%s:%d' for worker '%s'.", + host, port, aw->name); + } + else { + int rc; + JK_ENTER_CS(&aw->cs, rc); + if (rc) { + unsigned int i; + for (i = 0; i < aw->ep_cache_sz; i++) { + /* Close all connections in the cache */ + if (aw->ep_cache[i] && IS_VALID_SOCKET(aw->ep_cache[i]->sd)) { + int sd = aw->ep_cache[i]->sd; + aw->ep_cache[i]->sd = JK_INVALID_SOCKET; + aw->ep_cache[i]->addr_sequence = aw->addr_sequence; + jk_shutdown_socket(sd, l); + aw->s->connected--; + } + } + } + aw->port = port; + strncpy(aw->host, host, JK_SHM_STR_SIZ); + memcpy(&(aw->worker_inet_addr), &inet_addr, sizeof(inet_addr)); + if (rc) { + JK_LEAVE_CS(&aw->cs, rc); + } else { + jk_log(l, JK_LOG_ERROR, + "locking thread (errno=%d)", errno); + } + } + } + + JK_TRACE_EXIT(l); +} + +/* Syncing config values to shm */ +void jk_ajp_push(ajp_worker_t * aw, int locked, jk_logger_t *l) +{ + int address_change = JK_FALSE; + + JK_TRACE_ENTER(l); + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "syncing shm for ajp worker '%s' from mem (%u -> %u) [%u->%u]", + aw->name, aw->s->h.sequence, aw->sequence, aw->s->addr_sequence, aw->addr_sequence); + if (locked == JK_FALSE) + jk_shm_lock(); + + aw->s->cache_timeout = aw->cache_timeout; + aw->s->connect_timeout = aw->connect_timeout; + aw->s->ping_timeout = aw->ping_timeout; + aw->s->reply_timeout = aw->reply_timeout; + aw->s->prepost_timeout = aw->prepost_timeout; + aw->s->recovery_opts = aw->recovery_opts; + aw->s->retries = aw->retries; + aw->s->retry_interval = aw->retry_interval; + aw->s->max_packet_size = aw->max_packet_size; + aw->s->h.sequence = aw->sequence; + if (aw->s->addr_sequence != aw->addr_sequence) { + address_change = JK_TRUE; + strncpy(aw->s->host, aw->host, JK_SHM_STR_SIZ); + aw->s->port = aw->port; + aw->s->addr_sequence = aw->addr_sequence; + } + if (locked == JK_FALSE) + jk_shm_unlock(); + + if (address_change == JK_TRUE) { + int rc; + JK_ENTER_CS(&aw->cs, rc); + if (rc) { + unsigned int i; + for (i = 0; i < aw->ep_cache_sz; i++) { + /* Close all connections in the cache */ + if (aw->ep_cache[i] && IS_VALID_SOCKET(aw->ep_cache[i]->sd)) { + int sd = aw->ep_cache[i]->sd; + aw->ep_cache[i]->sd = JK_INVALID_SOCKET; + aw->ep_cache[i]->addr_sequence = aw->addr_sequence; + jk_shutdown_socket(sd, l); + aw->s->connected--; + } + } + JK_LEAVE_CS(&aw->cs, rc); + } else { + jk_log(l, JK_LOG_ERROR, + "locking thread (errno=%d)", errno); + } + } + JK_TRACE_EXIT(l); +} + +/** Send a message to an endpoint, using corresponding PROTO HEADER + * @param ae endpoint + * @param msg message to send + * @param l logger + * @return JK_FATAL_ERROR: endpoint contains unknown protocol + * JK_FALSE: other failure + * JK_TRUE: success + * @remark Always closes socket in case of + * a socket error, or JK_FATAL_ERROR + * @remark Cares about ae->last_errno + */ +int ajp_connection_tcp_send_message(ajp_endpoint_t * ae, + jk_msg_buf_t *msg, jk_logger_t *l) +{ + int rc; + + JK_TRACE_ENTER(l); + + ae->last_errno = 0; + if (ae->proto == AJP13_PROTO) { + jk_b_end(msg, AJP13_WS_HEADER); + if (JK_IS_DEBUG_LEVEL(l)) + jk_dump_buff(l, JK_LOG_DEBUG, "sending to ajp13", msg); + } + else if (ae->proto == AJP14_PROTO) { + jk_b_end(msg, AJP14_WS_HEADER); + if (JK_IS_DEBUG_LEVEL(l)) + jk_dump_buff(l, JK_LOG_DEBUG, "sending to ajp14", msg); + } + else { + jk_log(l, JK_LOG_ERROR, + "(%s) unknown protocol %d, supported are AJP13/AJP14", + ae->worker->name, ae->proto); + /* We've got a protocol error. */ + /* We can't trust this connection any more, */ + /* because we might have send already parts of the request. */ + ajp_abort_endpoint(ae, JK_TRUE, l); + JK_TRACE_EXIT(l); + return JK_FATAL_ERROR; + } + + /* This is the only place in this function where we use the socket. */ + /* If sendfull gets an error, it implicitely closes the socket. */ + /* So any socket error inside ajp_connection_tcp_send_message */ + /* results in a socket close and invalidated endpoint connection. */ + if ((rc = jk_tcp_socket_sendfull(ae->sd, msg->buf, + msg->len, l)) > 0) { + ae->endpoint.wr += (jk_uint64_t)rc; + JK_TRACE_EXIT(l); + return JK_TRUE; + } + ae->last_errno = errno; + jk_log(l, JK_LOG_INFO, + "sendfull for socket %d returned %d (errno=%d)", + ae->sd, rc, ae->last_errno); + ajp_abort_endpoint(ae, JK_FALSE, l); + JK_TRACE_EXIT(l); + return JK_FALSE; +} + +/** Receive a message from an endpoint, checking PROTO HEADER + * @param ae endpoint + * @param msg message to send + * @param l logger + * @return JK_TRUE: success + * JK_FALSE: could not read the AJP packet header + * JK_AJP_PROTOCOL_ERROR: failure after reading + * the AJP packet header + * @remark Always closes socket in case of + * a socket error + * @remark Cares about ae->last_errno + */ +int ajp_connection_tcp_get_message(ajp_endpoint_t * ae, + jk_msg_buf_t *msg, jk_logger_t *l) +{ + unsigned char head[AJP_HEADER_LEN]; + int rc; + int msglen; + unsigned int header; + char buf[32]; + + JK_TRACE_ENTER(l); + + ae->last_errno = 0; + /* If recvfull gets an error, it implicitely closes the socket. */ + /* We will invalidate the endpoint connection. */ + rc = jk_tcp_socket_recvfull(ae->sd, head, AJP_HEADER_LEN, l); + + /* If the return code is not negative */ + /* then we always get back the correct number of bytes. */ + if (rc < 0) { + if (rc == JK_SOCKET_EOF) { + ae->last_errno = EPIPE; + jk_log(l, JK_LOG_INFO, + "(%s) can't receive the response header message from tomcat, " + "tomcat (%s) has forced a connection close for socket %d", + ae->worker->name, jk_dump_hinfo(&ae->worker->worker_inet_addr, buf), + ae->sd); + } + else { + ae->last_errno = -rc; + jk_log(l, JK_LOG_INFO, + "(%s) can't receive the response header message from tomcat, " + "network problems or tomcat (%s) is down (errno=%d)", + ae->worker->name, jk_dump_hinfo(&ae->worker->worker_inet_addr, buf), + ae->last_errno); + } + ajp_abort_endpoint(ae, JK_FALSE, l); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + ae->endpoint.rd += (jk_uint64_t)rc; + header = ((unsigned int)head[0] << 8) | head[1]; + + if (ae->proto == AJP13_PROTO) { + if (header != AJP13_SW_HEADER) { + + if (header == AJP14_SW_HEADER) { + jk_log(l, JK_LOG_ERROR, + "received AJP14 reply on an AJP13 connection from %s", + jk_dump_hinfo(&ae->worker->worker_inet_addr, buf)); + } + else { + jk_log(l, JK_LOG_ERROR, + "wrong message format 0x%04x from %s", + header, jk_dump_hinfo(&ae->worker->worker_inet_addr, + buf)); + } + /* We've got a protocol error. */ + /* We can't trust this connection any more. */ + ajp_abort_endpoint(ae, JK_TRUE, l); + JK_TRACE_EXIT(l); + return JK_AJP_PROTOCOL_ERROR; + } + } + else if (ae->proto == AJP14_PROTO) { + if (header != AJP14_SW_HEADER) { + + if (header == AJP13_SW_HEADER) { + jk_log(l, JK_LOG_ERROR, + "received AJP13 reply on an AJP14 connection from %s", + jk_dump_hinfo(&ae->worker->worker_inet_addr, buf)); + } + else { + jk_log(l, JK_LOG_ERROR, + "wrong message format 0x%04x from %s", + header, jk_dump_hinfo(&ae->worker->worker_inet_addr, + buf)); + } + /* We've got a protocol error. */ + /* We can't trust this connection any more. */ + ajp_abort_endpoint(ae, JK_TRUE, l); + JK_TRACE_EXIT(l); + return JK_AJP_PROTOCOL_ERROR; + } + } + + msglen = ((head[2] & 0xff) << 8); + msglen += (head[3] & 0xFF); + + if (msglen > msg->maxlen) { + jk_log(l, JK_LOG_ERROR, + "wrong message size %d %d from %s", + msglen, msg->maxlen, + jk_dump_hinfo(&ae->worker->worker_inet_addr, buf)); + /* We've got a protocol error. */ + /* We can't trust this connection any more. */ + ajp_abort_endpoint(ae, JK_TRUE, l); + JK_TRACE_EXIT(l); + return JK_AJP_PROTOCOL_ERROR; + } + + msg->len = msglen; + msg->pos = 0; + + /* If recvfull gets an error, it implicitely closes the socket. */ + /* We will invalidate the endpoint connection. */ + rc = jk_tcp_socket_recvfull(ae->sd, msg->buf, msglen, l); + /* If the return code is not negative */ + /* then we always get back the correct number of bytes. */ + if (rc < 0) { + if (rc == JK_SOCKET_EOF) { + ae->last_errno = EPIPE; + jk_log(l, JK_LOG_ERROR, + "(%s) can't receive the response body message from tomcat, " + "tomcat (%s) has forced a connection close for socket %d", + ae->worker->name, jk_dump_hinfo(&ae->worker->worker_inet_addr, buf), + ae->sd); + } + else { + ae->last_errno = -rc; + jk_log(l, JK_LOG_ERROR, + "(%s) can't receive the response body message from tomcat, " + "network problems or tomcat (%s) is down (errno=%d)", + ae->worker->name, jk_dump_hinfo(&ae->worker->worker_inet_addr, buf), + ae->last_errno); + } + ajp_abort_endpoint(ae, JK_FALSE, l); + JK_TRACE_EXIT(l); + /* Although we have a connection, this is effectively a protocol error. + * We received the AJP header packet, but not the packet payload + */ + return JK_AJP_PROTOCOL_ERROR; + } + ae->endpoint.rd += (jk_uint64_t)rc; + + if (JK_IS_DEBUG_LEVEL(l)) { + if (ae->proto == AJP13_PROTO) + jk_dump_buff(l, JK_LOG_DEBUG, "received from ajp13", msg); + else if (ae->proto == AJP14_PROTO) + jk_dump_buff(l, JK_LOG_DEBUG, "received from ajp14", msg); + } + JK_TRACE_EXIT(l); + return JK_TRUE; +} + +/* + * Read all the data from the socket. + * + * Socket API doesn't guaranty that all the data will be kept in a + * single read, so we must loop until all awaited data is received + */ + +static int ajp_read_fully_from_server(jk_ws_service_t *s, jk_logger_t *l, + unsigned char *buf, unsigned int len) +{ + unsigned int rdlen = 0; + unsigned int padded_len = len; + + JK_TRACE_ENTER(l); + + if (s->is_chunked && s->no_more_chunks) { + JK_TRACE_EXIT(l); + return 0; + } + if (s->is_chunked) { + /* Corner case: buf must be large enough to hold next + * chunk size (if we're on or near a chunk border). + * Pad the length to a reasonable value, otherwise the + * read fails and the remaining chunks are tossed. + */ + padded_len = (len < CHUNK_BUFFER_PAD) ? len : len - CHUNK_BUFFER_PAD; + } + + while (rdlen < padded_len) { + unsigned int this_time = 0; + if (!s->read(s, buf + rdlen, len - rdlen, &this_time)) { + /* Remote Client read failed. */ + JK_TRACE_EXIT(l); + return JK_CLIENT_RD_ERROR; + } + + if (0 == this_time) { + if (s->is_chunked) { + s->no_more_chunks = 1; /* read no more */ + } + break; + } + rdlen += this_time; + } + + JK_TRACE_EXIT(l); + return (int)rdlen; +} + + +/* + * Read data from AJP13/AJP14 protocol + * Returns -1 on error, else number of bytes read + */ + +static int ajp_read_into_msg_buff(ajp_endpoint_t * ae, + jk_ws_service_t *r, + jk_msg_buf_t *msg, int len, jk_logger_t *l) +{ + unsigned char *read_buf = msg->buf; + + JK_TRACE_ENTER(l); + + jk_b_reset(msg); + + read_buf += AJP_HEADER_LEN; /* leave some space for the buffer headers */ + read_buf += AJP_HEADER_SZ_LEN; /* leave some space for the read length */ + + /* Pick the max size since we don't know the content_length */ + if (r->is_chunked && len == 0) { + len = AJP13_MAX_SEND_BODY_SZ; + } + + if ((len = ajp_read_fully_from_server(r, l, read_buf, len)) < 0) { + jk_log(l, JK_LOG_INFO, + "(%s) receiving data from client failed. " + "Connection aborted or network problems", + ae->worker->name); + JK_TRACE_EXIT(l); + return JK_CLIENT_RD_ERROR; + } + + if (!r->is_chunked) { + ae->left_bytes_to_send -= len; + } + + if (len > 0) { + /* Recipient recognizes empty packet as end of stream, not + an empty body packet */ + if (0 != jk_b_append_int(msg, (unsigned short)len)) { + jk_log(l, JK_LOG_INFO, + "Failed appending message length"); + JK_TRACE_EXIT(l); + return JK_CLIENT_RD_ERROR; + } + } + + msg->len += len; + + JK_TRACE_EXIT(l); + return len; +} + + +/* + * send request to Tomcat via Ajp13 + * - first try to find reuseable socket + * - if no such available, try to connect + * - send request, but send must be seen as asynchronous, + * since send() call will return noerror about 95% of time + * Hopefully we'll get more information on next read. + * + * nb: op->request is the original request msg buffer + * op->reply is the reply msg buffer which could be scratched + * + * Return values of ajp_send_request() function: + * return value op->recoverable reason + * JK_FATAL_ERROR JK_FALSE ajp_connection_tcp_send_message() returns JK_FATAL_ERROR + * Endpoint belongs to unknown protocol. + * JK_FATAL_ERROR JK_TRUE ajp_connection_tcp_send_message() returns JK_FALSE + * Sending request or request body in jk_tcp_socket_sendfull() returns with error. + * JK_FATAL_ERROR JK_TRUE Could not connect to backend + * JK_CLIENT_RD_ERROR JK_FALSE Error during reading parts of POST body from client + * JK_TRUE JK_TRUE All other cases (OK) + */ +static int ajp_send_request(jk_endpoint_t *e, + jk_ws_service_t *s, + jk_logger_t *l, + ajp_endpoint_t * ae, ajp_operation_t * op) +{ + int err_conn = 0; + int err_cping = 0; + int err_send = 0; + int rc; + int postlen; + + JK_TRACE_ENTER(l); + + ae->last_errno = 0; + /* Up to now, we can recover */ + op->recoverable = JK_TRUE; + + /* Check if the previous request really ended + */ + if (ae->last_op != JK_AJP13_END_RESPONSE && + ae->last_op != AJP13_CPONG_REPLY) { + jk_log(l, JK_LOG_INFO, + "(%s) did not receive END_RESPONSE, " + "closing socket %d", + ae->worker->name, ae->sd); + ajp_abort_endpoint(ae, JK_TRUE, l); + } + /* + * First try to check open connections... + */ + while (IS_VALID_SOCKET(ae->sd)) { + int err = JK_FALSE; + if (jk_is_socket_connected(ae->sd, l) == JK_FALSE) { + ae->last_errno = errno; + jk_log(l, JK_LOG_DEBUG, + "(%s) failed sending request, " + "socket %d is not connected any more (errno=%d)", + ae->worker->name, ae->sd, ae->last_errno); + ajp_abort_endpoint(ae, JK_FALSE, l); + err = JK_TRUE; + err_conn++; + } + if (ae->worker->prepost_timeout > 0 && !err) { + /* handle cping/cpong if prepost_timeout is set + * If the socket is disconnected no need to handle + * the cping/cpong + */ + if (ajp_handle_cping_cpong(ae, + ae->worker->prepost_timeout, l) == JK_FALSE) { + jk_log(l, JK_LOG_INFO, + "(%s) failed sending request, " + "socket %d prepost cping/cpong failure (errno=%d)", + ae->worker->name, ae->sd, ae->last_errno); + /* XXX: Is there any reason to try other + * connections to the node if one of them fails + * the cping/cpong heartbeat? + * Tomcat can be either too busy or simply dead, so + * there is a chance that all other connections would + * fail as well. + */ + err = JK_TRUE; + err_cping++; + } + } + + /* We've got a connected socket and the optional + * cping/cpong worked, so let's send the request now. + */ + if (err == JK_FALSE) { + rc = ajp_connection_tcp_send_message(ae, op->request, l); + /* If this worked, we can break out of the loop + * and proceed with the request. + */ + if (rc == JK_TRUE) { + ae->last_op = JK_AJP13_FORWARD_REQUEST; + break; + } + /* Error during sending the request. + */ + err_send++; + if (rc == JK_FATAL_ERROR) + op->recoverable = JK_FALSE; + jk_log(l, JK_LOG_INFO, + "(%s) failed sending request (%srecoverable) " + "(errno=%d)", + ae->worker->name, + op->recoverable ? "" : "un", + ae->last_errno); + JK_TRACE_EXIT(l); + return JK_FATAL_ERROR; + } + /* If we got an error or can't send data, then try to steal another pooled + * connection and try again. If we are not successful, break out of this + * loop and try to open a new connection after the loop. + */ + if (ajp_next_connection(ae, l) == JK_FALSE) + break; + } + + /* + * If we failed to reuse a connection, try to reconnect. + */ + if (!IS_VALID_SOCKET(ae->sd)) { + /* Could not steal any connection from an endpoint - backend is disconnected */ + if (err_conn + err_cping + err_send > 0) + jk_log(l, JK_LOG_INFO, + "(%s) all endpoints are disconnected, " + "detected by connect check (%d), cping (%d), send (%d)", + ae->worker->name, err_conn, err_cping, err_send); + else if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "(%s) all endpoints are disconnected.", + ae->worker->name); + /* Connect to the backend. + */ + if (ajp_connect_to_endpoint(ae, l) != JK_TRUE) { + jk_log(l, JK_LOG_ERROR, + "(%s) connecting to backend failed. Tomcat is probably not started " + "or is listening on the wrong port (errno=%d)", + ae->worker->name, ae->last_errno); + JK_TRACE_EXIT(l); + return JK_FATAL_ERROR; + } + if (ae->worker->connect_timeout <= 0 && + ae->worker->prepost_timeout > 0) { + /* handle cping/cpong if prepost_timeout is set + * and we didn't already do a connect cping/cpong. + */ + if (ajp_handle_cping_cpong(ae, + ae->worker->prepost_timeout, l) == JK_FALSE) { + jk_log(l, JK_LOG_INFO, + "(%s) failed sending request, " + "socket %d prepost cping/cpong failure (errno=%d)", + ae->worker->name, ae->sd, ae->last_errno); + JK_TRACE_EXIT(l); + return JK_FATAL_ERROR; + } + } + + /* We've got a connected socket and the optional + * cping/cpong worked, so let's send the request now. + */ + rc = ajp_connection_tcp_send_message(ae, op->request, l); + /* Error during sending the request. + */ + if (rc != JK_TRUE) { + if (rc == JK_FATAL_ERROR) + op->recoverable = JK_FALSE; + jk_log(l, JK_LOG_ERROR, + "(%s) failed sending request on a fresh connection (%srecoverable), " + "socket %d (errno=%d)", + ae->worker->name, op->recoverable ? "" : "un", + ae->sd, ae->last_errno); + JK_TRACE_EXIT(l); + return JK_FATAL_ERROR; + } + ae->last_op = JK_AJP13_FORWARD_REQUEST; + } + else if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "(%s) Statistics about invalid connections: " + "connect check (%d), cping (%d), send (%d)", + ae->worker->name, err_conn, err_cping, err_send); + + /* + * From now on an error means that we have an internal server error + * or Tomcat crashed. + */ + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "(%s) request body to send %" JK_UINT64_T_FMT " - request body to resend %d", + ae->worker->name, ae->left_bytes_to_send, + op->reply->len - AJP_HEADER_LEN); + + /* + * POST recovery job is done here and will work when data to + * POST are less than 8k, since it's the maximum size of op-post buffer. + * We send here the first part of data which was sent previously to the + * remote Tomcat + */ + + /* Did we have something to resend (ie the op-post has been feeded previously */ + + postlen = op->post->len; + if (postlen > AJP_HEADER_LEN) { + rc = ajp_connection_tcp_send_message(ae, op->post, l); + /* Error during sending the request body. + */ + if (rc != JK_TRUE) { + if (rc == JK_FATAL_ERROR) + op->recoverable = JK_FALSE; + jk_log(l, JK_LOG_ERROR, + "(%s) failed sending request body of size %d (%srecoverable), " + "socket %d (errno=%d)", + ae->worker->name, postlen, op->recoverable ? "" : "un", + ae->sd, ae->last_errno); + JK_TRACE_EXIT(l); + return JK_FATAL_ERROR; + } + else { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, "Resent the request body (%d)", + postlen); + } + } + else if (s->reco_status == RECO_FILLED) { + /* Recovery in LB MODE */ + postlen = s->reco_buf->len; + + if (postlen > AJP_HEADER_LEN) { + rc = ajp_connection_tcp_send_message(ae, s->reco_buf, l); + /* Error during sending the request body. + */ + if (rc != JK_TRUE) { + if (rc == JK_FATAL_ERROR) + op->recoverable = JK_FALSE; + jk_log(l, JK_LOG_ERROR, + "(%s) failed sending request body of size %d (lb mode) (%srecoverable), " + "socket %d (errno=%d)", + ae->worker->name, postlen, op->recoverable ? "" : "un", + ae->sd, ae->last_errno); + JK_TRACE_EXIT(l); + return JK_FATAL_ERROR; + } + } + else { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "Resent the request body (lb mode) (%d)", postlen); + } + } + else { + /* We never sent any POST data and we check if we have to send at + * least one block of data (max 8k). These data will be kept in reply + * for resend if the remote Tomcat is down, a fact we will learn only + * doing a read (not yet) + */ + /* || s->is_chunked - this can't be done here. The original protocol + sends the first chunk of post data ( based on Content-Length ), + and that's what the java side expects. + Sending this data for chunked would break other ajp13 servers. + + Note that chunking will continue to work - using the normal read. + */ + + if (ae->left_bytes_to_send > 0) { + int len = AJP13_MAX_SEND_BODY_SZ; + if (ae->left_bytes_to_send < (jk_uint64_t)AJP13_MAX_SEND_BODY_SZ) { + len = (int)ae->left_bytes_to_send; + } + if ((len = ajp_read_into_msg_buff(ae, s, op->post, len, l)) <= 0) { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "(%s) browser stop sending data, no need to recover", + ae->worker->name); + op->recoverable = JK_FALSE; + /* Send an empty POST message since per AJP protocol + * spec whenever we have content length the message + * packet must be followed with initial POST packet. + * Size zero will be handled as error in container. + */ + jk_b_reset(op->post); + jk_b_append_int(op->post, 0); + ajp_connection_tcp_send_message(ae, op->post, l); + JK_TRACE_EXIT(l); + return JK_CLIENT_RD_ERROR; + } + + /* If a RECOVERY buffer is available in LB mode, fill it */ + if (s->reco_status == RECO_INITED) { + jk_b_copy(op->post, s->reco_buf); + s->reco_status = RECO_FILLED; + } + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "(%s) sending %d bytes of request body", + ae->worker->name, len); + + s->content_read = (jk_uint64_t)len; + rc = ajp_connection_tcp_send_message(ae, op->post, l); + /* Error during sending the request body. + */ + if (rc != JK_TRUE) { + if (rc == JK_FATAL_ERROR) + op->recoverable = JK_FALSE; + jk_log(l, JK_LOG_ERROR, + "(%s) failed sending request body of size %d (%srecoverable), " + "socket %d (errno=%d)", + ae->worker->name, len, op->recoverable ? "" : "un", + ae->sd, ae->last_errno); + JK_TRACE_EXIT(l); + return JK_FATAL_ERROR; + } + } + } + JK_TRACE_EXIT(l); + return JK_TRUE; +} + + +/* + * What to do with incoming data (dispatcher) + */ + +static int ajp_process_callback(jk_msg_buf_t *msg, + jk_msg_buf_t *pmsg, + ajp_endpoint_t * ae, + jk_ws_service_t *r, jk_logger_t *l) +{ + int code = (int)jk_b_get_byte(msg); + + JK_TRACE_ENTER(l); + + switch (code) { + case JK_AJP13_SEND_HEADERS: + { + int rc; + jk_res_data_t res; + if (ae->last_op == JK_AJP13_SEND_HEADERS) { + /* Do not send anything to the client. + * Backend already send us the headers. + */ + if (JK_IS_DEBUG_LEVEL(l)) { + jk_log(l, JK_LOG_DEBUG, + "Already received AJP13_SEND HEADERS"); + } + JK_TRACE_EXIT(l); + return JK_AJP13_ERROR; + } + if (!ajp_unmarshal_response(msg, &res, ae, l)) { + jk_log(l, JK_LOG_ERROR, + "ajp_unmarshal_response failed"); + JK_TRACE_EXIT(l); + return JK_AJP13_ERROR; + } + r->http_response_status = res.status; + if (r->extension.fail_on_status_size > 0) + rc = is_http_status_fail(r->extension.fail_on_status_size, + r->extension.fail_on_status, res.status); + else + rc = is_http_status_fail(ae->worker->http_status_fail_num, + ae->worker->http_status_fail, res.status); + if (rc > 0) { + JK_TRACE_EXIT(l); + return JK_STATUS_FATAL_ERROR; + } + else if (rc < 0) { + JK_TRACE_EXIT(l); + return JK_STATUS_ERROR; + } + + if (r->extension.use_server_error_pages && + r->http_response_status >= r->extension.use_server_error_pages) + r->response_blocked = JK_TRUE; + + /* + * Call even if response is blocked, since it also handles + * forwarding some headers for special http status codes + * even if the server uses an own error page. + * Example: The WWW-Authenticate header in case of + * HTTP_UNAUTHORIZED (401). + */ + r->start_response(r, res.status, res.msg, + (const char *const *)res.header_names, + (const char *const *)res.header_values, + res.num_headers); + + if (!r->response_blocked) { + if (r->flush && r->flush_header) + r->flush(r); + } + } + return JK_AJP13_SEND_HEADERS; + + case JK_AJP13_SEND_BODY_CHUNK: + if (ae->last_op == JK_AJP13_FORWARD_REQUEST) { + /* AJP13_SEND_BODY_CHUNK with length 0 is + * explicit flush packet message. + * Ignore those if they are left over from previous responses. + * Reportedly some versions of JBoss suffer from that problem. + */ + if (jk_b_get_int(msg) == 0) { + jk_log(l, JK_LOG_DEBUG, + "Ignoring flush message received while sending the request"); + return ae->last_op; + } + /* We have just send a request but received something + * that probably originates from buffered response. + */ + if (JK_IS_DEBUG_LEVEL(l)) { + jk_log(l, JK_LOG_DEBUG, + "Unexpected AJP13_SEND_BODY_CHUNK"); + } + JK_TRACE_EXIT(l); + return JK_AJP13_ERROR; + } + if (!r->response_blocked) { + unsigned int len = (unsigned int)jk_b_get_int(msg); + /* + * Do a sanity check on len to prevent write reading beyond buffer + * boundaries and thus revealing possible sensitive memory + * contents to the client. + * len cannot be larger than msg->len - 3 because the ajp message + * contains the magic byte for JK_AJP13_SEND_BODY_CHUNK (1 byte) + * and the length of the chunk (2 bytes). The remaining part of + * the message is the chunk. + */ + if (len > (unsigned int)(msg->len - 3)) { + jk_log(l, JK_LOG_ERROR, + "Chunk length too large. Length of AJP message is %i," + " chunk length is %i.", msg->len, len); + JK_TRACE_EXIT(l); + return JK_INTERNAL_ERROR; + } + if (len == 0) { + /* AJP13_SEND_BODY_CHUNK with length 0 is + * explicit flush packet message. + */ + if (r->response_started) { + if (r->flush) { + r->flush(r); + } + } + else { + jk_log(l, JK_LOG_DEBUG, + "Ignoring flush message received before headers"); + } + } + else { + if (!r->write(r, msg->buf + msg->pos, len)) { + jk_log(l, JK_LOG_INFO, + "Writing to client aborted or client network problems"); + JK_TRACE_EXIT(l); + return JK_CLIENT_WR_ERROR; + } + if (r->flush && r->flush_packets) + r->flush(r); + } + } + break; + + case JK_AJP13_GET_BODY_CHUNK: + { + int len = (int)jk_b_get_int(msg); + + if (len < 0) { + len = 0; + } + if (len > AJP13_MAX_SEND_BODY_SZ) { + len = AJP13_MAX_SEND_BODY_SZ; + } + if ((jk_uint64_t)len > ae->left_bytes_to_send) { + len = (int)ae->left_bytes_to_send; + } + + /* the right place to add file storage for upload */ + if ((len = ajp_read_into_msg_buff(ae, r, pmsg, len, l)) >= 0) { + r->content_read += (jk_uint64_t)len; + JK_TRACE_EXIT(l); + return JK_AJP13_HAS_RESPONSE; + } + + jk_log(l, JK_LOG_INFO, + "Reading from client aborted or client network problems"); + + JK_TRACE_EXIT(l); + return JK_CLIENT_RD_ERROR; + } + break; + + case JK_AJP13_END_RESPONSE: + ae->reuse = (int)jk_b_get_byte(msg); + if (!ae->reuse) { + /* + * AJP13 protocol reuse flag set to false. + * Tomcat will close its side of the connection. + */ + jk_log(l, JK_LOG_WARNING, "AJP13 protocol: Reuse is set to false"); + } + else if (r->disable_reuse) { + if (JK_IS_DEBUG_LEVEL(l)) { + jk_log(l, JK_LOG_DEBUG, "AJP13 protocol: Reuse is disabled"); + } + ae->reuse = JK_FALSE; + } + else { + /* Reuse in all cases */ + if (JK_IS_DEBUG_LEVEL(l)) { + jk_log(l, JK_LOG_DEBUG, "AJP13 protocol: Reuse is OK"); + } + ae->reuse = JK_TRUE; + } + if (!r->response_blocked) { + if (r->done) { + /* Done with response */ + r->done(r); + } + else if (r->flush && !r->flush_packets) { + /* Flush after the last write */ + r->flush(r); + } + } + JK_TRACE_EXIT(l); + return JK_AJP13_END_RESPONSE; + break; + + default: + jk_log(l, JK_LOG_ERROR, + "Unknown AJP protocol code: %02X", code); + JK_TRACE_EXIT(l); + return JK_AJP13_ERROR; + } + + JK_TRACE_EXIT(l); + return JK_AJP13_NO_RESPONSE; +} + +/* + * get replies from Tomcat via Ajp13/Ajp14 + * ajp13/ajp14 is async but handling read/send this way prevent nice recovery + * In fact if tomcat link is broken during upload (browser -> apache -> tomcat) + * we'll loose data and we'll have to abort the whole request. + * + * Return values of ajp_get_reply() function: + * return value op->recoverable reason + * JK_REPLY_TIMEOUT ?recovery_options Reply timeout while waiting for response packet + * JK_FALSE ?recovery_options Error during ajp_connection_tcp_get_message() + * Could not read the AJP packet header + * JK_AJP_PROTOCOL_ERROR: ?recovery_options Error during ajp_connection_tcp_get_message() + * Failure after reading the AJP packet header + * JK_STATUS_ERROR mostly JK_TRUE ajp_process_callback() returns JK_STATUS_ERROR + * Recoverable, if callback didn't return with a JK_HAS_RESPONSE before. + * JK_HAS_RESPONSE: parts of the post buffer are consumed. + * JK_STATUS_FATAL_ERROR mostly JK_TRUE ajp_process_callback() returns JK_STATUS_FATAL_ERROR + * Recoverable, if callback didn't return with a JK_HAS_RESPONSE before. + * JK_HAS_RESPONSE: parts of the post buffer are consumed. + * JK_FATAL_ERROR ? ajp_process_callback() returns JK_AJP13_ERROR + * JK_AJP13_ERROR: protocol error, or JK_INTERNAL_ERROR: chunk size to large + * JK_CLIENT_RD_ERROR ? ajp_process_callback() returns JK_CLIENT_RD_ERROR + * JK_CLIENT_RD_ERROR: could not read post from client. + * JK_CLIENT_WR_ERROR ? ajp_process_callback() returns JK_CLIENT_WR_ERROR + * JK_CLIENT_WR_ERROR: could not write back result to client + * JK_TRUE ? ajp_process_callback() returns JK_AJP13_END_RESPONSE + * JK_FALSE ? Other unhandled cases (unknown return codes) + */ +static int ajp_get_reply(jk_endpoint_t *e, + jk_ws_service_t *s, + jk_logger_t *l, + ajp_endpoint_t * p, ajp_operation_t * op) +{ + /* Don't get header from tomcat yet */ + int headeratclient = JK_FALSE; + + JK_TRACE_ENTER(l); + + p->last_errno = 0; + /* Start read all reply message */ + while (1) { + int rc = 0; + /* Allow to overwrite reply_timeout on a per URL basis via service struct */ + int reply_timeout = s->extension.reply_timeout; + if (reply_timeout < 0) + reply_timeout = p->worker->reply_timeout; + /* If we set a reply timeout, check if something is available */ + if (reply_timeout > 0) { + if (jk_is_input_event(p->sd, reply_timeout, l) == + JK_FALSE) { + p->last_errno = errno; + jk_log(l, JK_LOG_ERROR, + "(%s) Timeout with waiting reply from tomcat. " + "Tomcat is down, stopped or network problems (errno=%d)", + p->worker->name, p->last_errno); + /* We can't trust this connection any more. */ + ajp_abort_endpoint(p, JK_TRUE, l); + if (headeratclient == JK_FALSE) { + if (p->worker->recovery_opts & RECOVER_ABORT_IF_TCGETREQUEST) + op->recoverable = JK_FALSE; + /* + * We revert back to recoverable, if recovery_opts allow it for GET or HEAD + */ + if (op->recoverable == JK_FALSE) { + if (p->worker->recovery_opts & RECOVER_ALWAYS_HTTP_HEAD) { + if (!strcmp(s->method, "HEAD")) + op->recoverable = JK_TRUE; + } + else if (p->worker->recovery_opts & RECOVER_ALWAYS_HTTP_GET) { + if (!strcmp(s->method, "GET")) + op->recoverable = JK_TRUE; + } + } + } + else { + if (p->worker->recovery_opts & RECOVER_ABORT_IF_TCSENDHEADER) + op->recoverable = JK_FALSE; + } + + JK_TRACE_EXIT(l); + return JK_REPLY_TIMEOUT; + } + } + + if ((rc = ajp_connection_tcp_get_message(p, op->reply, l)) != JK_TRUE) { + if (headeratclient == JK_FALSE) { + jk_log(l, JK_LOG_ERROR, + "(%s) Tomcat is down or refused connection. " + "No response has been sent to the client (yet)", + p->worker->name); + /* + * communication with tomcat has been interrupted BEFORE + * headers have been sent to the client. + */ + + /* + * We mark it unrecoverable if recovery_opts set to RECOVER_ABORT_IF_TCGETREQUEST + */ + if (p->worker->recovery_opts & RECOVER_ABORT_IF_TCGETREQUEST) + op->recoverable = JK_FALSE; + /* + * We revert back to recoverable, if recovery_opts allow it for GET or HEAD + */ + if (op->recoverable == JK_FALSE) { + if (p->worker->recovery_opts & RECOVER_ALWAYS_HTTP_HEAD) { + if (!strcmp(s->method, "HEAD")) + op->recoverable = JK_TRUE; + } + else if (p->worker->recovery_opts & RECOVER_ALWAYS_HTTP_GET) { + if (!strcmp(s->method, "GET")) + op->recoverable = JK_TRUE; + } + } + + } + else { + jk_log(l, JK_LOG_ERROR, + "(%s) Tomcat is down or network problems. " + "Part of the response has already been sent to the client", + p->worker->name); + + /* communication with tomcat has been interrupted AFTER + * headers have been sent to the client. + * headers (and maybe parts of the body) have already been + * sent, therefore the response is "complete" in a sense + * that nobody should append any data, especially no 500 error + * page of the webserver! + */ + + /* + * We mark it unrecoverable if recovery_opts set to RECOVER_ABORT_IF_TCSENDHEADER + */ + if (p->worker->recovery_opts & RECOVER_ABORT_IF_TCSENDHEADER) + op->recoverable = JK_FALSE; + + } + + JK_TRACE_EXIT(l); + return rc; + } + + rc = ajp_process_callback(op->reply, op->post, p, s, l); + p->last_op = rc; + /* no more data to be sent, fine we have finish here */ + if (JK_AJP13_END_RESPONSE == rc) { + JK_TRACE_EXIT(l); + return JK_TRUE; + } + else if (JK_AJP13_SEND_HEADERS == rc) { + if (headeratclient == JK_FALSE) + headeratclient = JK_TRUE; + else { + /* Backend send headers twice? + * This is protocol violation + */ + jk_log(l, JK_LOG_ERROR, + "(%s) Tomcat already send headers", + p->worker->name); + op->recoverable = JK_FALSE; + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + else if (JK_STATUS_ERROR == rc || JK_STATUS_FATAL_ERROR == rc) { + jk_log(l, JK_LOG_INFO, + "(%s) request failed%s, " + "because of response status %d, ", + p->worker->name, + rc == JK_STATUS_FATAL_ERROR ? "" : " (soft)", + s->http_response_status); + JK_TRACE_EXIT(l); + return rc; + } + else if (JK_AJP13_HAS_RESPONSE == rc) { + /* + * in upload-mode there is no second chance since + * we may have already sent part of the uploaded data + * to Tomcat. + * In this case if Tomcat connection is broken we must + * abort request and indicate error. + * A possible work-around could be to store the uploaded + * data to file and replay for it + */ + op->recoverable = JK_FALSE; + rc = ajp_connection_tcp_send_message(p, op->post, l); + if (rc < 0) { + jk_log(l, JK_LOG_ERROR, + "(%s) Tomcat is down or network problems", + p->worker->name); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + else if (JK_AJP13_ERROR == rc) { + /* + * Tomcat has send invalid AJP message. + * Loadbalancer if present will decide if + * failover is possible. + */ + JK_TRACE_EXIT(l); + return JK_FATAL_ERROR; + } + else if (JK_CLIENT_RD_ERROR == rc) { + /* + * Client has stop sending to us, so get out. + * We assume this isn't our fault, so just a normal exit. + */ + JK_TRACE_EXIT(l); + return JK_CLIENT_RD_ERROR; + } + else if (JK_CLIENT_WR_ERROR == rc) { + /* + * Client has stop receiving to us, so get out. + * We assume this isn't our fault, so just a normal exit. + */ + JK_TRACE_EXIT(l); + return JK_CLIENT_WR_ERROR; + } + else if (JK_INTERNAL_ERROR == rc) { + /* + * Internal error, like memory allocation or invalid packet lengths. + */ + JK_TRACE_EXIT(l); + return JK_FATAL_ERROR; + } + else if (JK_AJP13_NO_RESPONSE == rc) { + /* + * This is fine, loop again, more data to send. + */ + continue; + } + else if (rc < 0) { + op->recoverable = JK_FALSE; + jk_log(l, JK_LOG_ERROR, + "(%s) Callback returns with unknown value %d", + p->worker->name, rc); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + /* XXX: Not reached? */ + JK_TRACE_EXIT(l); + return JK_FALSE; +} + +static void ajp_update_stats(jk_endpoint_t *e, ajp_worker_t *aw, int rc, jk_logger_t *l) +{ + aw->s->readed += e->rd; + aw->s->transferred += e->wr; + if (aw->s->busy) + aw->s->busy--; + if (rc == JK_TRUE) { + aw->s->state = JK_AJP_STATE_OK; + } + else if (rc == JK_CLIENT_ERROR) { + aw->s->state = JK_AJP_STATE_OK; + aw->s->client_errors++; + } + else { + aw->s->state = JK_AJP_STATE_ERROR; + aw->s->errors++; + aw->s->error_time = time(NULL); + } +} + +/* + * service is now splitted in ajp_send_request and ajp_get_reply + * much more easier to do errors recovery + * + * We serve here the request, using AJP13/AJP14 (e->proto) + * + * Return values of service() method for ajp13/ajp14 worker: + * return value is_error e->recoverable reason + * JK_FALSE JK_HTTP_SERVER_ERROR TRUE Invalid Parameters (null values) + * JK_SERVER_ERROR JK_HTTP_SERVER_ERROR TRUE Error during initializing empty request, response or post body objects + * JK_CLIENT_ERROR JK_HTTP_REQUEST_TOO_LARGE JK_TRUE Request doesn't fit into buffer (error during ajp_marshal_into_msgb()) + * JK_CLIENT_ERROR JK_HTTP_BAD_REQUEST JK_FALSE Error during reading parts of POST body from client + * JK_FATAL_ERROR JK_HTTP_SERVER_ERROR JK_FALSE If ajp_send_request() returns JK_FATAL_ERROR and !op->recoverable. + * JK_FATAL_ERROR JK_HTTP_SERVER_ERROR JK_FALSE If ajp_send_request() returns JK_TRUE but !op->recoverable. + * This should never happen. + * JK_CLIENT_ERROR JK_HTTP_BAD_REQUEST ? ajp_get_reply() returns JK_CLIENT_RD_ERROR + * JK_CLIENT_ERROR JK_HTTP_OK ? ajp_get_reply() returns JK_CLIENT_WR_ERROR + * JK_FATAL_ERROR JK_HTTP_SERVER_ERROR JK_TRUE ajp_get_reply() returns JK_FATAL_ERROR + * JK_FATAL_ERROR: protocol error or internal error + * JK_FATAL_ERROR JK_HTTP_SERVER_ERROR JK_FALSE ajp_get_reply() returns JK_FATAL_ERROR + * JK_FATAL_ERROR: protocol error or internal error + * JK_STATUS_ERROR JK_HTTP_SERVER_BUSY JK_TRUE ajp_get_reply() returns JK_STATUS_ERROR + * Only if op->recoverable and no more ajp13/ajp14 direct retries + * JK_STATUS_ERROR JK_HTTP_SERVER_BUSY JK_FALSE ajp_get_reply() returns JK_STATUS_ERROR + * Only if !op->recoverable + * JK_STATUS_FATAL_ERROR JK_HTTP_SERVER_BUSY JK_TRUE ajp_get_reply() returns JK_STATUS_ERROR + * Only if op->recoverable and no more ajp13/ajp14 direct retries + * JK_STATUS_FATAL_ERROR JK_HTTP_SERVER_BUSY JK_FALSE ajp_get_reply() returns JK_STATUS_FATAL_ERROR + * Only if !op->recoverable + * JK_REPLY_TIMEOUT JK_HTTP_GATEWAY_TIME_OUT JK_TRUE ajp_get_reply() returns JK_REPLY_TIMEOUT + * JK_AJP_PROTOCOL_ERROR JK_HTTP_GATEWAY_TIME_OUT ? ajp_get_reply() returns JK_AJP_PROTOCOL_ERROR + * ??? JK_FATAL_ERROR JK_HTTP_GATEWAY_TIME_OUT JK_FALSE ajp_get_reply() returns something else + * Only if !op->recoverable + * ??? JK_FALSE JK_HTTP_SERVER_BUSY JK_TRUE ajp_get_reply() returns JK_FALSE + * Only if op->recoverable and no more ajp13/ajp14 direct retries + * JK_TRUE JK_HTTP_OK ? OK + */ +static int JK_METHOD ajp_service(jk_endpoint_t *e, + jk_ws_service_t *s, + jk_logger_t *l, int *is_error) +{ + int i; + int err = JK_TRUE; + ajp_operation_t oper; + ajp_operation_t *op = &oper; + ajp_endpoint_t *p; + ajp_worker_t *aw; + int log_error; + int rc = JK_UNSET; + char *msg = ""; + int retry_interval; + + JK_TRACE_ENTER(l); + + if (!e || !e->endpoint_private || !s || !is_error) { + JK_LOG_NULL_PARAMS(l); + if (is_error) + *is_error = JK_HTTP_SERVER_ERROR; + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + p = e->endpoint_private; + aw = p->worker; + + if (aw->sequence != aw->s->h.sequence) + jk_ajp_pull(aw, JK_FALSE, l); + + aw->s->used++; + + /* Set returned error to SERVER ERROR */ + *is_error = JK_HTTP_SERVER_ERROR; + + op->request = jk_b_new(&(p->pool)); + if (!op->request) { + jk_log(l, JK_LOG_ERROR, + "Failed allocating AJP message"); + JK_TRACE_EXIT(l); + return JK_SERVER_ERROR; + } + if (jk_b_set_buffer_size(op->request, aw->max_packet_size)) { + jk_log(l, JK_LOG_ERROR, + "Failed allocating AJP message buffer"); + JK_TRACE_EXIT(l); + return JK_SERVER_ERROR; + } + jk_b_reset(op->request); + + op->reply = jk_b_new(&(p->pool)); + if (!op->reply) { + jk_log(l, JK_LOG_ERROR, + "Failed allocating AJP message"); + JK_TRACE_EXIT(l); + return JK_SERVER_ERROR; + } + if (jk_b_set_buffer_size(op->reply, aw->max_packet_size)) { + jk_log(l, JK_LOG_ERROR, + "Failed allocating AJP message buffer"); + JK_TRACE_EXIT(l); + return JK_SERVER_ERROR; + } + + op->post = jk_b_new(&(p->pool)); + if (!op->post) { + jk_log(l, JK_LOG_ERROR, + "Failed allocating AJP message"); + JK_TRACE_EXIT(l); + return JK_SERVER_ERROR; + } + if (jk_b_set_buffer_size(op->post, aw->max_packet_size)) { + jk_log(l, JK_LOG_ERROR, + "Failed allocating AJP message buffer"); + JK_TRACE_EXIT(l); + return JK_SERVER_ERROR; + } + jk_b_reset(op->post); + + /* Set returned error to OK */ + *is_error = JK_HTTP_OK; + + op->recoverable = JK_TRUE; + op->uploadfd = -1; /* not yet used, later ;) */ + + p->left_bytes_to_send = s->content_length; + p->reuse = JK_FALSE; + p->hard_close = JK_FALSE; + + s->secret = aw->secret; + + /* + * We get here initial request (in op->request) + */ + if (!ajp_marshal_into_msgb(op->request, s, l, p)) { + *is_error = JK_HTTP_REQUEST_TOO_LARGE; + jk_log(l, JK_LOG_INFO, + "Creating AJP message failed, " + "without recovery"); + aw->s->client_errors++; + JK_TRACE_EXIT(l); + return JK_CLIENT_ERROR; + } + + if (JK_IS_DEBUG_LEVEL(l)) { + jk_log(l, JK_LOG_DEBUG, "processing %s with %d retries", + aw->name, aw->retries); + } + aw->s->busy++; + if (aw->s->state == JK_AJP_STATE_ERROR) + aw->s->state = JK_AJP_STATE_PROBE; + if (aw->s->busy > aw->s->max_busy) + aw->s->max_busy = aw->s->busy; + retry_interval = p->worker->retry_interval; + for (i = 0; i < aw->retries; i++) { + /* Reset reply message buffer for each retry */ + jk_b_reset(op->reply); + + /* + * ajp_send_request() already locally handles + * reconnecting and broken connection detection. + * So if we already failed in it, wait a bit before + * retrying the same backend. + */ + if (i > 0 && retry_interval >= 0) { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "retry %d, sleeping for %d ms before retrying", + i, retry_interval); + jk_sleep(retry_interval); + /* Pull shared memory if something changed during sleep */ + if (aw->sequence != aw->s->h.sequence) + jk_ajp_pull(aw, JK_FALSE, l); + } + /* + * We're using op->request which hold initial request + * if Tomcat is stopped or restarted, we will pass op->request + * to next valid tomcat. + */ + log_error = JK_TRUE; + rc = JK_UNSET; + msg = ""; + err = ajp_send_request(e, s, l, p, op); + e->recoverable = op->recoverable; + if (err == JK_CLIENT_RD_ERROR) { + *is_error = JK_HTTP_BAD_REQUEST; + msg = "because of client read error"; + aw->s->client_errors++; + rc = JK_CLIENT_ERROR; + log_error = JK_FALSE; + e->recoverable = JK_FALSE; + /* Ajp message set reuse to TRUE in END_REQUEST message + * However due to client bad request if the recovery + * RECOVER_ABORT_IF_CLIENTERROR is set the physical connection + * will be closed and application in Tomcat can catch that + * generated exception, knowing the client aborted the + * connection. This AJP protocol limitation, where we + * should actually send some packet informing the backend + * that client broke the connection in a middle of + * request/response cycle. + */ + if (aw->recovery_opts & RECOVER_ABORT_IF_CLIENTERROR) { + /* Mark the endpoint for shutdown */ + p->reuse = JK_FALSE; + p->hard_close = JK_TRUE; + } + } + else if (err == JK_FATAL_ERROR) { + *is_error = JK_HTTP_SERVER_BUSY; + msg = "because of error during request sending"; + rc = err; + if (!op->recoverable) { + *is_error = JK_HTTP_SERVER_ERROR; + msg = "because of protocol error during request sending"; + } + } + else if (err == JK_TRUE && op->recoverable) { + /* Up to there we can recover */ + + err = ajp_get_reply(e, s, l, p, op); + e->recoverable = op->recoverable; + if (err == JK_TRUE) { + *is_error = JK_HTTP_OK; + /* Done with the request */ + ajp_update_stats(e, aw, JK_TRUE, l); + JK_TRACE_EXIT(l); + return JK_TRUE; + } + + if (err == JK_CLIENT_RD_ERROR) { + *is_error = JK_HTTP_BAD_REQUEST; + msg = "because of client read error"; + aw->s->client_errors++; + rc = JK_CLIENT_ERROR; + log_error = JK_FALSE; + e->recoverable = JK_FALSE; + op->recoverable = JK_FALSE; + if (aw->recovery_opts & RECOVER_ABORT_IF_CLIENTERROR) { + /* Mark the endpoint for shutdown */ + p->reuse = JK_FALSE; + p->hard_close = JK_TRUE; + } + } + else if (err == JK_CLIENT_WR_ERROR) { + /* XXX: Is this correct to log this as 200? */ + *is_error = JK_HTTP_OK; + msg = "because of client write error"; + aw->s->client_errors++; + rc = JK_CLIENT_ERROR; + log_error = JK_FALSE; + e->recoverable = JK_FALSE; + op->recoverable = JK_FALSE; + if (aw->recovery_opts & RECOVER_ABORT_IF_CLIENTERROR) { + /* Mark the endpoint for shutdown */ + p->reuse = JK_FALSE; + p->hard_close = JK_TRUE; + } + } + else if (err == JK_FATAL_ERROR) { + *is_error = JK_HTTP_SERVER_ERROR; + msg = "because of server error"; + rc = err; + } + else if (err == JK_REPLY_TIMEOUT) { + *is_error = JK_HTTP_GATEWAY_TIME_OUT; + msg = "because of reply timeout"; + aw->s->reply_timeouts++; + rc = err; + } + else if (err == JK_STATUS_ERROR || err == JK_STATUS_FATAL_ERROR) { + *is_error = JK_HTTP_SERVER_BUSY; + msg = "because of response status"; + rc = err; + } + else if (err == JK_AJP_PROTOCOL_ERROR) { + *is_error = JK_HTTP_BAD_GATEWAY; + msg = "because of protocol error"; + rc = err; + } + /* This should only be the cases err == JK_FALSE */ + else { + /* if we can't get reply, check if unrecoverable flag was set + * if is_recoverable_error is cleared, we have started + * receiving upload data and we must consider that + * operation is no more recoverable + */ + *is_error = JK_HTTP_BAD_GATEWAY; + msg = ""; + rc = JK_FALSE; + } + } + else { + /* XXX: this should never happen: + * ajp_send_request() never returns JK_TRUE if !op->recoverable. + * and all other return values have already been handled. + */ + e->recoverable = JK_FALSE; + *is_error = JK_HTTP_SERVER_ERROR; + msg = "because of an unknown reason"; + rc = JK_FATAL_ERROR; + jk_log(l, JK_LOG_ERROR, + "(%s) unexpected condition err=%d recoverable=%d", + aw->name, err, op->recoverable); + } + if (!op->recoverable && log_error == JK_TRUE) { + jk_log(l, JK_LOG_ERROR, + "(%s) sending request to tomcat failed (unrecoverable), " + "%s " + "(attempt=%d)", + aw->name, msg, i + 1); + } + else { + jk_log(l, JK_LOG_INFO, + "(%s) sending request to tomcat failed (%srecoverable), " + "%s " + "(attempt=%d)", + aw->name, + op->recoverable ? "" : "un", + msg, i + 1); + } + if (!op->recoverable) { + ajp_update_stats(e, aw, rc, l); + JK_TRACE_EXIT(l); + return rc; + } + /* Get another connection from the pool and try again. + * Note: All sockets are probably closed already. + */ + ajp_next_connection(p, l); + } + /* Log the error only once per failed request. */ + jk_log(l, JK_LOG_ERROR, + "(%s) connecting to tomcat failed.", + aw->name); + + ajp_update_stats(e, aw, rc, l); + JK_TRACE_EXIT(l); + return rc; +} + +/* + * Validate the worker (ajp13/ajp14) + */ + +int ajp_validate(jk_worker_t *pThis, + jk_map_t *props, + jk_worker_env_t *we, jk_logger_t *l, int proto) +{ + int port; + const char *host; + + JK_TRACE_ENTER(l); + + if (proto == AJP13_PROTO) { + port = AJP13_DEF_PORT; + host = AJP13_DEF_HOST; + } + else if (proto == AJP14_PROTO) { + port = AJP14_DEF_PORT; + host = AJP14_DEF_HOST; + } + else { + jk_log(l, JK_LOG_ERROR, + "unknown protocol %d", proto); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + if (pThis && pThis->worker_private) { + ajp_worker_t *p = pThis->worker_private; + p->port = jk_get_worker_port(props, p->name, port); + if (!host) { + host = "undefined"; + } + strncpy(p->host, jk_get_worker_host(props, p->name, host), JK_SHM_STR_SIZ); + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "worker %s contact is '%s:%d'", + p->name, p->host, p->port); + /* Copy the contact to shm */ + strncpy(p->s->host, p->host, JK_SHM_STR_SIZ); + p->s->port = p->port; + p->s->addr_sequence = p->addr_sequence = 0; + /* Resolve if port > 0. + */ + if (p->port > 0) { + if (jk_resolve(p->host, p->port, &p->worker_inet_addr, we->pool, l)) { + JK_TRACE_EXIT(l); + return JK_TRUE; + } + jk_log(l, JK_LOG_ERROR, + "worker %s can't resolve tomcat address %s", + p->name, p->host); + p->s->port = p->port = 0; + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "worker %s contact is disabled", + p->name); + JK_TRACE_EXIT(l); + return JK_TRUE; + } + else { + p->s->port = p->port = 0; + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "worker %s contact is disabled", + p->name); + JK_TRACE_EXIT(l); + return JK_TRUE; + } + } + else { + JK_LOG_NULL_PARAMS(l); + } + + JK_TRACE_EXIT(l); + return JK_FALSE; +} + +static int ajp_create_endpoint_cache(ajp_worker_t *p, int proto, jk_logger_t *l) +{ + unsigned int i; + time_t now = time(NULL); + + JK_TRACE_ENTER(l); + + p->ep_cache = (ajp_endpoint_t **)calloc(1, sizeof(ajp_endpoint_t *) * + p->ep_cache_sz); + if (!p->ep_cache) { + JK_TRACE_EXIT(l); + return JK_FALSE; + } + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "setting connection pool size to %u with min %u and acquire timeout %d", + p->ep_cache_sz, p->ep_mincache_sz, p->cache_acquire_timeout); + for (i = 0; i < p->ep_cache_sz; i++) { + p->ep_cache[i] = (ajp_endpoint_t *)calloc(1, sizeof(ajp_endpoint_t)); + if (!p->ep_cache[i]) { + jk_log(l, JK_LOG_ERROR, + "allocating endpoint slot %d (errno=%d)", + i, errno); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + p->ep_cache[i]->sd = JK_INVALID_SOCKET; + p->ep_cache[i]->reuse = JK_FALSE; + p->ep_cache[i]->hard_close = JK_FALSE; + p->ep_cache[i]->last_access = now; + jk_open_pool(&(p->ep_cache[i]->pool), p->ep_cache[i]->buf, + sizeof(p->ep_cache[i]->buf)); + p->ep_cache[i]->worker = p; + p->ep_cache[i]->endpoint.endpoint_private = p->ep_cache[i]; + p->ep_cache[i]->proto = proto; + p->ep_cache[i]->endpoint.service = ajp_service; + p->ep_cache[i]->endpoint.done = ajp_done; + p->ep_cache[i]->last_op = JK_AJP13_END_RESPONSE; + p->ep_cache[i]->addr_sequence = 0; + } + + JK_TRACE_EXIT(l); + return JK_TRUE; +} + +int ajp_init(jk_worker_t *pThis, + jk_map_t *props, jk_worker_env_t *we, jk_logger_t *l, int proto) +{ + int rc = JK_FALSE; + int cache; + /* + * start the connection cache + */ + JK_TRACE_ENTER(l); + + cache = jk_get_worker_def_cache_size(proto); + + if (pThis && pThis->worker_private) { + ajp_worker_t *p = pThis->worker_private; + p->worker.we = we; + p->ep_cache_sz = jk_get_worker_cache_size(props, p->name, cache); + p->ep_mincache_sz = jk_get_worker_cache_size_min(props, p->name, + (p->ep_cache_sz+1) / 2); + p->socket_timeout = + jk_get_worker_socket_timeout(props, p->name, AJP_DEF_SOCKET_TIMEOUT); + + p->socket_connect_timeout = + jk_get_worker_socket_connect_timeout(props, p->name, + p->socket_timeout * 1000); + + p->keepalive = + jk_get_worker_socket_keepalive(props, p->name, JK_FALSE); + + p->cache_timeout = + jk_get_worker_cache_timeout(props, p->name, + AJP_DEF_CACHE_TIMEOUT); + + p->ping_timeout = + jk_get_worker_ping_timeout(props, p->name, + AJP_DEF_PING_TIMEOUT); + p->ping_mode = + jk_get_worker_ping_mode(props, p->name, + AJP_CPING_NONE); + + p->connect_timeout = + jk_get_worker_connect_timeout(props, p->name, + AJP_DEF_CONNECT_TIMEOUT); + + p->prepost_timeout = + jk_get_worker_prepost_timeout(props, p->name, + AJP_DEF_PREPOST_TIMEOUT); + + if ((p->ping_mode & AJP_CPING_CONNECT) && + p->connect_timeout == AJP_DEF_CONNECT_TIMEOUT) + p->connect_timeout = p->ping_timeout; + + if ((p->ping_mode & AJP_CPING_PREPOST) && + p->prepost_timeout == AJP_DEF_PREPOST_TIMEOUT) + p->prepost_timeout = p->ping_timeout; + + p->conn_ping_interval = + jk_get_worker_conn_ping_interval(props, p->name, 0); + if ((p->ping_mode & AJP_CPING_INTERVAL) && + p->conn_ping_interval == 0) { + /* XXX: Ping timeout is in miliseconds + * and ping_interval is in seconds. + * Use 10 times larger value for ping interval + * (ping_timeout / 1000) * 10 + */ + p->conn_ping_interval = p->ping_timeout / 100; + } + p->reply_timeout = + jk_get_worker_reply_timeout(props, p->name, + AJP_DEF_REPLY_TIMEOUT); + + p->recovery_opts = + jk_get_worker_recovery_opts(props, p->name, + AJP_DEF_RECOVERY_OPTS); + + p->retries = + jk_get_worker_retries(props, p->name, + JK_RETRIES); + + p->max_packet_size = + jk_get_max_packet_size(props, p->name); + + p->socket_buf = + jk_get_worker_socket_buffer(props, p->name, p->max_packet_size); + + p->retry_interval = + jk_get_worker_retry_interval(props, p->name, + JK_SLEEP_DEF); + p->cache_acquire_timeout = jk_get_worker_cache_acquire_timeout(props, + p->name, p->retries * p->retry_interval); + p->http_status_fail_num = jk_get_worker_fail_on_status(props, p->name, + &p->http_status_fail[0], + JK_MAX_HTTP_STATUS_FAILS); + + if (p->retries < 1) { + jk_log(l, JK_LOG_INFO, + "number of retries must be greater then 1. Setting to default=%d", + JK_RETRIES); + p->retries = JK_RETRIES; + } + + p->maintain_time = jk_get_worker_maintain_time(props); + if(p->maintain_time < 0) + p->maintain_time = 0; + p->s->last_maintain_time = time(NULL); + p->s->last_reset = p->s->last_maintain_time; + + if (JK_IS_DEBUG_LEVEL(l)) { + + jk_log(l, JK_LOG_DEBUG, + "setting endpoint options:", + p->keepalive); + jk_log(l, JK_LOG_DEBUG, + "keepalive: %d", + p->keepalive); + + jk_log(l, JK_LOG_DEBUG, + "socket timeout: %d", + p->socket_timeout); + + jk_log(l, JK_LOG_DEBUG, + "socket connect timeout: %d", + p->socket_connect_timeout); + + jk_log(l, JK_LOG_DEBUG, + "buffer size: %d", + p->socket_buf); + + jk_log(l, JK_LOG_DEBUG, + "pool timeout: %d", + p->cache_timeout); + + jk_log(l, JK_LOG_DEBUG, + "ping timeout: %d", + p->ping_timeout); + + jk_log(l, JK_LOG_DEBUG, + "connect timeout: %d", + p->connect_timeout); + + jk_log(l, JK_LOG_DEBUG, + "reply timeout: %d", + p->reply_timeout); + + jk_log(l, JK_LOG_DEBUG, + "prepost timeout: %d", + p->prepost_timeout); + + jk_log(l, JK_LOG_DEBUG, + "recovery options: %d", + p->recovery_opts); + + jk_log(l, JK_LOG_DEBUG, + "retries: %d", + p->retries); + + jk_log(l, JK_LOG_DEBUG, + "max packet size: %d", + p->max_packet_size); + + jk_log(l, JK_LOG_DEBUG, + "retry interval: %d", + p->retry_interval); + } + /* + * Need to initialize secret here since we could return from inside + * of the following loop + */ + + p->secret = jk_get_worker_secret(props, p->name); + /* Initialize cache slots */ + JK_INIT_CS(&(p->cs), rc); + if (!rc) { + jk_log(l, JK_LOG_ERROR, + "creating thread lock (errno=%d)", + errno); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + if (!ajp_create_endpoint_cache(p, proto, l)) { + jk_log(l, JK_LOG_ERROR, + "allocating connection pool of size %u", + p->ep_cache_sz); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + rc = JK_TRUE; + } + else { + JK_LOG_NULL_PARAMS(l); + } + + JK_TRACE_EXIT(l); + return rc; +} + +int JK_METHOD ajp_worker_factory(jk_worker_t **w, + const char *name, jk_logger_t *l) +{ + ajp_worker_t *aw; + + JK_TRACE_ENTER(l); + if (name == NULL || w == NULL) { + JK_LOG_NULL_PARAMS(l); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + aw = (ajp_worker_t *) calloc(1, sizeof(ajp_worker_t)); + if (!aw) { + jk_log(l, JK_LOG_ERROR, + "malloc of private_data failed"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + jk_open_pool(&aw->p, + aw->buf, + sizeof(jk_pool_atom_t) * TINY_POOL_SIZE); + + strncpy(aw->name, name, JK_SHM_STR_SIZ); + aw->login = NULL; + + aw->ep_cache_sz = 0; + aw->ep_cache = NULL; + aw->connect_retry_attempts = AJP_DEF_RETRY_ATTEMPTS; + aw->worker.worker_private = aw; + + aw->worker.maintain = ajp_maintain; + + aw->logon = NULL; + + *w = &aw->worker; + + aw->s = jk_shm_alloc_ajp_worker(&aw->p); + if (!aw->s) { + jk_close_pool(&aw->p); + free(aw); + jk_log(l, JK_LOG_ERROR, + "allocating ajp worker record from shared memory"); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + JK_TRACE_EXIT(l); + return JK_TRUE; +} + +int ajp_destroy(jk_worker_t **pThis, jk_logger_t *l, int proto) +{ + JK_TRACE_ENTER(l); + + if (pThis && *pThis && (*pThis)->worker_private) { + unsigned int i; + ajp_worker_t *aw = (*pThis)->worker_private; + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "up to %u endpoints to close", + aw->ep_cache_sz); + + for (i = 0; i < aw->ep_cache_sz; i++) { + if (aw->ep_cache[i]) + ajp_close_endpoint(aw->ep_cache[i], l); + } + free(aw->ep_cache); + JK_DELETE_CS(&(aw->cs), i); + + if (aw->login) { + /* take care of removing previously allocated data */ + if (aw->login->servlet_engine_name) + free(aw->login->servlet_engine_name); + + free(aw->login); + aw->login = NULL; + } + + jk_close_pool(&aw->p); + free(aw); + JK_TRACE_EXIT(l); + return JK_TRUE; + } + + JK_LOG_NULL_PARAMS(l); + JK_TRACE_EXIT(l); + return JK_FALSE; +} + +int JK_METHOD ajp_done(jk_endpoint_t **e, jk_logger_t *l) +{ + JK_TRACE_ENTER(l); + + if (e && *e && (*e)->endpoint_private) { + ajp_endpoint_t *p = (*e)->endpoint_private; + int rc; + ajp_worker_t *w = p->worker; + + /* set last_access only if needed */ + if (w->cache_timeout > 0) + p->last_access = time(NULL); + if (w->s->addr_sequence != p->addr_sequence) { + p->reuse = JK_FALSE; + p->addr_sequence = w->s->addr_sequence; + } + ajp_reset_endpoint(p, l); + *e = NULL; + JK_ENTER_CS(&w->cs, rc); + if (rc) { + int i; + + for (i = w->ep_cache_sz - 1; i >= 0; i--) { + if (w->ep_cache[i] == NULL) { + w->ep_cache[i] = p; + break; + } + } + JK_LEAVE_CS(&w->cs, rc); + + if (i >= 0) { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "recycling connection pool slot=%u for worker %s", + i, p->worker->name); + JK_TRACE_EXIT(l); + return JK_TRUE; + } + /* This should never hapen because + * there is always free empty cache slot + */ + jk_log(l, JK_LOG_ERROR, + "could not find empty connection pool slot from %u for worker %s", + w->ep_cache_sz, w->name); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + jk_log(l, JK_LOG_ERROR, + "locking thread (errno=%d)", errno); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + + JK_LOG_NULL_PARAMS(l); + JK_TRACE_EXIT(l); + return JK_FALSE; +} + +int ajp_get_endpoint(jk_worker_t *pThis, + jk_endpoint_t **je, jk_logger_t *l, int proto) +{ + JK_TRACE_ENTER(l); + + if (pThis && pThis->worker_private && je) { + ajp_worker_t *aw = pThis->worker_private; + ajp_endpoint_t *ae = NULL; + int rc; + int retry = 0; + + *je = NULL; + /* Loop until cache_acquire_timeout interval elapses */ + while ((retry * JK_SLEEP_DEF) < aw->cache_acquire_timeout) { + + JK_ENTER_CS(&aw->cs, rc); + if (rc) { + unsigned int slot; + /* Try to find connected socket cache entry */ + for (slot = 0; slot < aw->ep_cache_sz; slot++) { + if (aw->ep_cache[slot] && + IS_VALID_SOCKET(aw->ep_cache[slot]->sd)) { + ae = aw->ep_cache[slot]; + if (ae->reuse) { + aw->ep_cache[slot] = NULL; + break; + } + else { + /* XXX: We shouldn't have non reusable + * opened socket in the cache + */ + ajp_reset_endpoint(ae, l); + ae = NULL; + jk_log(l, JK_LOG_WARNING, + "closing non reusable pool slot=%d", slot); + } + } + } + if (!ae) { + /* No connected cache entry found. + * Use the first free one. + */ + for (slot = 0; slot < aw->ep_cache_sz; slot++) { + if (aw->ep_cache[slot]) { + ae = aw->ep_cache[slot]; + aw->ep_cache[slot] = NULL; + break; + } + } + } + JK_LEAVE_CS(&aw->cs, rc); + if (ae) { + if (aw->cache_timeout > 0) + ae->last_access = time(NULL); + *je = &ae->endpoint; + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "acquired connection pool slot=%u after %d retries", + slot, retry); + JK_TRACE_EXIT(l); + return JK_TRUE; + } + else { + retry++; + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "could not get free endpoint for worker %s" + " (retry %d, sleeping for %d ms)", + aw->name, retry, JK_SLEEP_DEF); + jk_sleep(JK_SLEEP_DEF); + } + } + else { + jk_log(l, JK_LOG_ERROR, + "locking thread (errno=%d)", errno); + JK_TRACE_EXIT(l); + return JK_FALSE; + + } + } + jk_log(l, JK_LOG_WARNING, + "Unable to get the free endpoint for worker %s from %u slots", + aw->name, aw->ep_cache_sz); + } + else { + JK_LOG_NULL_PARAMS(l); + } + + JK_TRACE_EXIT(l); + return JK_FALSE; +} + +int JK_METHOD ajp_maintain(jk_worker_t *pThis, time_t mstarted, jk_logger_t *l) +{ + JK_TRACE_ENTER(l); + + if (pThis && pThis->worker_private) { + ajp_worker_t *aw = pThis->worker_private; + time_t now = mstarted; + int rc; + long delta; + + jk_shm_lock(); + + /* Now we check for global maintenance (once for all processes). + * Checking workers for idleness. + * Therefore we globally sync and we use a global timestamp. + * Since it's possible that we come here a few milliseconds + * before the interval has passed, we allow a little tolerance. + */ + delta = (long)difftime(mstarted, aw->s->last_maintain_time) + JK_AJP_MAINTAIN_TOLERANCE; + if (delta >= aw->maintain_time) { + aw->s->last_maintain_time = mstarted; + if (aw->s->state == JK_AJP_STATE_OK && + aw->s->used == aw->s->used_snapshot) + aw->s->state = JK_AJP_STATE_IDLE; + aw->s->used_snapshot = aw->s->used; + } + + jk_shm_unlock(); + + /* Do connection pool maintenance only if timeouts or keepalives are set */ + if (aw->cache_timeout <= 0 && + aw->conn_ping_interval <= 0) { + /* Nothing to do. */ + JK_TRACE_EXIT(l); + return JK_TRUE; + } + + JK_ENTER_CS(&aw->cs, rc); + if (rc) { + unsigned int n = 0, k = 0, cnt = 0; + int i; + unsigned int m, m_count = 0; + jk_sock_t *m_sock; + /* Count open slots */ + for (i = (int)aw->ep_cache_sz - 1; i >= 0; i--) { + if (aw->ep_cache[i] && IS_VALID_SOCKET(aw->ep_cache[i]->sd)) + cnt++; + } + m_sock = (jk_sock_t *)malloc((cnt + 1) * sizeof(jk_sock_t)); + /* Handle worker cache timeouts */ + if (aw->cache_timeout > 0) { + for (i = (int)aw->ep_cache_sz - 1; + i >= 0; i--) { + /* Skip the closed sockets */ + if (aw->ep_cache[i] && IS_VALID_SOCKET(aw->ep_cache[i]->sd)) { + int elapsed = (int)difftime(mstarted, aw->ep_cache[i]->last_access); + if (elapsed > aw->cache_timeout) { + time_t rt = 0; + n++; + if (JK_IS_DEBUG_LEVEL(l)) + rt = time(NULL); + aw->ep_cache[i]->reuse = JK_FALSE; + m_sock[m_count++] = aw->ep_cache[i]->sd; + aw->ep_cache[i]->sd = JK_INVALID_SOCKET; + ajp_reset_endpoint(aw->ep_cache[i], l); + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "cleaning pool slot=%d elapsed %d in %d", + i, elapsed, (int)(difftime(time(NULL), rt))); + } + } + if (cnt <= aw->ep_mincache_sz + n) { + if (JK_IS_DEBUG_LEVEL(l)) { + jk_log(l, JK_LOG_DEBUG, + "reached pool min size %u from %u cache slots", + aw->ep_mincache_sz, aw->ep_cache_sz); + } + break; + } + } + } + /* Handle worker connection keepalive */ + if (aw->conn_ping_interval > 0 && aw->ping_timeout > 0) { + for (i = (int)aw->ep_cache_sz - 1; i >= 0; i--) { + /* Skip the closed sockets */ + if (aw->ep_cache[i] && IS_VALID_SOCKET(aw->ep_cache[i]->sd)) { + int elapsed = (int)difftime(now, aw->ep_cache[i]->last_access); + if (elapsed > aw->conn_ping_interval) { + k++; + /* handle cping/cpong. + */ + if (ajp_handle_cping_cpong(aw->ep_cache[i], + aw->ping_timeout, l) == JK_FALSE) { + jk_log(l, JK_LOG_INFO, + "(%s) failed sending request, " + "socket %d keepalive cping/cpong " + "failure (errno=%d)", + aw->name, + aw->ep_cache[i]->sd, + aw->ep_cache[i]->last_errno); + aw->ep_cache[i]->reuse = JK_FALSE; + m_sock[m_count++] = aw->ep_cache[i]->sd; + aw->ep_cache[i]->sd = JK_INVALID_SOCKET; + ajp_reset_endpoint(aw->ep_cache[i], l); + } + else { + now = time(NULL); + aw->ep_cache[i]->last_access = now; + } + } + } + } + } + JK_LEAVE_CS(&aw->cs, rc); + /* Shutdown sockets outside of the lock. + * This has benefits only if maintain was + * called from the watchdog thread. + */ + for (m = 0; m < m_count; m++) { + jk_shutdown_socket(m_sock[m], l); + } + free(m_sock); + if (n && JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "recycled %u sockets in %d seconds from %u pool slots", + n, (int)(difftime(time(NULL), mstarted)), + aw->ep_cache_sz); + if (k && JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "pinged %u sockets in %d seconds from %u pool slots", + k, (int)(difftime(time(NULL), mstarted)), + aw->ep_cache_sz); + JK_TRACE_EXIT(l); + return JK_TRUE; + } + else { + jk_log(l, JK_LOG_ERROR, + "locking thread (errno=%d)", + errno); + JK_TRACE_EXIT(l); + return JK_FALSE; + + } + } + else { + JK_LOG_NULL_PARAMS(l); + } + + JK_TRACE_EXIT(l); + return JK_FALSE; +} + +int ajp_has_endpoint(jk_worker_t *pThis, + jk_logger_t *l) +{ + JK_TRACE_ENTER(l); + + if (pThis && pThis->worker_private) { + ajp_worker_t *aw = pThis->worker_private; + int rc; + + JK_ENTER_CS(&aw->cs, rc); + if (rc) { + unsigned int slot; + /* Try to find connected socket cache entry */ + for (slot = 0; slot < aw->ep_cache_sz; slot++) { + if (aw->ep_cache[slot]) { + JK_LEAVE_CS(&aw->cs, rc); + return JK_TRUE; + } + } + JK_LEAVE_CS(&aw->cs, rc); + } + else { + jk_log(l, JK_LOG_ERROR, + "locking thread (errno=%d)", errno); + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + else { + JK_LOG_NULL_PARAMS(l); + } + + JK_TRACE_EXIT(l); + return JK_FALSE; +}