X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_http_client.cc;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_http_client.cc;h=45c3405ecd5f97a0a2dc98b67116e63c9a0adb44;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_http_client.cc b/src/ceph/src/rgw/rgw_http_client.cc new file mode 100644 index 0000000..45c3405 --- /dev/null +++ b/src/ceph/src/rgw/rgw_http_client.cc @@ -0,0 +1,1015 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/compat.h" + +#include + +#include +#include +#include + +#include "rgw_common.h" +#include "rgw_http_client.h" +#include "rgw_http_errors.h" +#include "common/RefCountedObj.h" + +#include "rgw_coroutine.h" + +#include + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +struct rgw_http_req_data : public RefCountedObject { + CURL *easy_handle; + curl_slist *h; + uint64_t id; + int ret; + std::atomic done = { false }; + RGWHTTPClient *client; + void *user_info; + bool registered; + RGWHTTPManager *mgr; + char error_buf[CURL_ERROR_SIZE]; + + Mutex lock; + Cond cond; + + rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0), + client(nullptr), user_info(nullptr), registered(false), + mgr(NULL), lock("rgw_http_req_data::lock") { + memset(error_buf, 0, sizeof(error_buf)); + } + + int wait() { + Mutex::Locker l(lock); + cond.Wait(lock); + return ret; + } + + + void finish(int r) { + Mutex::Locker l(lock); + ret = r; + if (easy_handle) + curl_easy_cleanup(easy_handle); + + if (h) + curl_slist_free_all(h); + + easy_handle = NULL; + h = NULL; + done = true; + cond.Signal(); + } + + bool is_done() { + return done; + } + + int get_retcode() { + Mutex::Locker l(lock); + return ret; + } + + RGWHTTPManager *get_manager() { + Mutex::Locker l(lock); + return mgr; + } +}; + +/* + * the simple set of callbacks will be called on RGWHTTPClient::process() + */ +/* Static methods - callbacks for libcurl. */ +size_t RGWHTTPClient::simple_receive_http_header(void * const ptr, + const size_t size, + const size_t nmemb, + void * const _info) +{ + RGWHTTPClient *client = static_cast(_info); + const size_t len = size * nmemb; + int ret = client->receive_header(ptr, size * nmemb); + if (ret < 0) { + dout(0) << "WARNING: client->receive_header() returned ret=" + << ret << dendl; + } + + return len; +} + +size_t RGWHTTPClient::simple_receive_http_data(void * const ptr, + const size_t size, + const size_t nmemb, + void * const _info) +{ + RGWHTTPClient *client = static_cast(_info); + const size_t len = size * nmemb; + int ret = client->receive_data(ptr, size * nmemb); + if (ret < 0) { + dout(0) << "WARNING: client->receive_data() returned ret=" + << ret << dendl; + } + + return len; +} + +size_t RGWHTTPClient::simple_send_http_data(void * const ptr, + const size_t size, + const size_t nmemb, + void * const _info) +{ + RGWHTTPClient *client = static_cast(_info); + int ret = client->send_data(ptr, size * nmemb); + if (ret < 0) { + dout(0) << "WARNING: client->send_data() returned ret=" + << ret << dendl; + } + + return ret; +} + +/* + * the following set of callbacks will be called either on RGWHTTPManager::process(), + * or via the RGWHTTPManager async processing. + */ +size_t RGWHTTPClient::receive_http_header(void * const ptr, + const size_t size, + const size_t nmemb, + void * const _info) +{ + rgw_http_req_data *req_data = static_cast(_info); + size_t len = size * nmemb; + + Mutex::Locker l(req_data->lock); + + if (!req_data->registered) { + return len; + } + + int ret = req_data->client->receive_header(ptr, size * nmemb); + if (ret < 0) { + dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl; + } + + return len; +} + +size_t RGWHTTPClient::receive_http_data(void * const ptr, + const size_t size, + const size_t nmemb, + void * const _info) +{ + rgw_http_req_data *req_data = static_cast(_info); + size_t len = size * nmemb; + + Mutex::Locker l(req_data->lock); + + if (!req_data->registered) { + return len; + } + + int ret = req_data->client->receive_data(ptr, size * nmemb); + if (ret < 0) { + dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; + } + + return len; +} + +size_t RGWHTTPClient::send_http_data(void * const ptr, + const size_t size, + const size_t nmemb, + void * const _info) +{ + rgw_http_req_data *req_data = static_cast(_info); + + Mutex::Locker l(req_data->lock); + + if (!req_data->registered) { + return 0; + } + + int ret = req_data->client->send_data(ptr, size * nmemb); + if (ret < 0) { + dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; + } + + return ret; +} + +static curl_slist *headers_to_slist(param_vec_t& headers) +{ + curl_slist *h = NULL; + + param_vec_t::iterator iter; + for (iter = headers.begin(); iter != headers.end(); ++iter) { + pair& p = *iter; + string val = p.first; + + if (strncmp(val.c_str(), "HTTP_", 5) == 0) { + val = val.substr(5); + } + + /* we need to convert all underscores into dashes as some web servers forbid them + * in the http header field names + */ + for (size_t i = 0; i < val.size(); i++) { + if (val[i] == '_') { + val[i] = '-'; + } + } + + val.append(": "); + val.append(p.second); + h = curl_slist_append(h, val.c_str()); + } + + return h; +} + +static bool is_upload_request(const char *method) +{ + if (method == nullptr) { + return false; + } + return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0; +} + +/* + * process a single simple one off request, not going through RGWHTTPManager. Not using + * req_data. + */ +int RGWHTTPClient::process(const char *method, const char *url) +{ + int ret = 0; + CURL *curl_handle; + + char error_buf[CURL_ERROR_SIZE]; + + last_method = (method ? method : ""); + last_url = (url ? url : ""); + + curl_handle = curl_easy_init(); + + dout(20) << "sending request to " << url << dendl; + + curl_slist *h = headers_to_slist(headers); + + curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method); + curl_easy_setopt(curl_handle, CURLOPT_URL, url); + curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L); + curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L); + curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header); + curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this); + curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data); + curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this); + curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf); + if (h) { + curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h); + } + curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data); + curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this); + if (is_upload_request(method)) { + curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L); + } + if (has_send_len) { + curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len); + } + if (!verify_ssl) { + curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L); + dout(20) << "ssl verification is set to off" << dendl; + } + + CURLcode status = curl_easy_perform(curl_handle); + if (status) { + dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl; + ret = -EINVAL; + } + curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status); + curl_easy_cleanup(curl_handle); + curl_slist_free_all(h); + + return ret; +} + +string RGWHTTPClient::to_str() +{ + string method_str = (last_method.empty() ? "" : last_method); + string url_str = (last_url.empty() ? "" : last_url); + return method_str + " " + url_str; +} + +int RGWHTTPClient::get_req_retcode() +{ + if (!req_data) { + return -EINVAL; + } + + return req_data->get_retcode(); +} + +/* + * init request, will be used later with RGWHTTPManager + */ +int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint) +{ + assert(!req_data); + _req_data->get(); + req_data = _req_data; + + CURL *easy_handle; + + easy_handle = curl_easy_init(); + + req_data->easy_handle = easy_handle; + + dout(20) << "sending request to " << url << dendl; + + curl_slist *h = headers_to_slist(headers); + + req_data->h = h; + + last_method = (method ? method : ""); + last_url = (url ? url : ""); + + curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method); + curl_easy_setopt(easy_handle, CURLOPT_URL, url); + curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L); + curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L); + curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header); + curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data); + curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data); + curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data); + curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf); + if (h) { + curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h); + } + curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data); + curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data); + if (send_data_hint || is_upload_request(method)) { + curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L); + } + if (has_send_len) { + curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len); + } + if (!verify_ssl) { + curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L); + dout(20) << "ssl verification is set to off" << dendl; + } + curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data); + + return 0; +} + +/* + * wait for async request to complete + */ +int RGWHTTPClient::wait() +{ + if (!req_data->is_done()) { + return req_data->wait(); + } + + return req_data->ret; +} + +RGWHTTPClient::~RGWHTTPClient() +{ + if (req_data) { + RGWHTTPManager *http_manager = req_data->get_manager(); + if (http_manager) { + http_manager->remove_request(this); + } + + req_data->put(); + } +} + + +int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len) +{ + const boost::string_ref header_line(static_cast(ptr), len); + + /* We're tokening the line that way due to backward compatibility. */ + const size_t sep_loc = header_line.find_first_of(" \t:"); + + if (boost::string_ref::npos == sep_loc) { + /* Wrongly formatted header? Just skip it. */ + return 0; + } + + header_name_t name(header_line.substr(0, sep_loc)); + if (0 == relevant_headers.count(name)) { + /* Not interested in this particular header. */ + return 0; + } + + const auto value_part = header_line.substr(sep_loc + 1); + + /* Skip spaces and tabs after the separator. */ + const size_t val_loc_s = value_part.find_first_not_of(' '); + const size_t val_loc_e = value_part.find_first_of("\r\n"); + + if (boost::string_ref::npos == val_loc_s || + boost::string_ref::npos == val_loc_e) { + /* Empty value case. */ + found_headers.emplace(name, header_value_t()); + } else { + found_headers.emplace(name, header_value_t( + value_part.substr(val_loc_s, val_loc_e - val_loc_s))); + } + + return 0; +} + +int RGWHTTPTransceiver::send_data(void* ptr, size_t len) +{ + int length_to_copy = 0; + if (post_data_index < post_data.length()) { + length_to_copy = min(post_data.length() - post_data_index, len); + memcpy(ptr, post_data.data() + post_data_index, length_to_copy); + post_data_index += length_to_copy; + } + return length_to_copy; +} + + +static int clear_signal(int fd) +{ + // since we're in non-blocking mode, we can try to read a lot more than + // one signal from signal_thread() to avoid later wakeups. non-blocking reads + // are also required to support the curl_multi_wait bug workaround + std::array buf; + int ret = ::read(fd, (void *)buf.data(), buf.size()); + if (ret < 0) { + ret = -errno; + return ret == -EAGAIN ? 0 : ret; // clear EAGAIN + } + return 0; +} + +#if HAVE_CURL_MULTI_WAIT + +static std::once_flag detect_flag; +static bool curl_multi_wait_bug_present = false; + +static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle, + int write_fd, int read_fd) +{ + int ret = 0; + + // write to write_fd so that read_fd becomes readable + uint32_t buf = 0; + ret = ::write(write_fd, &buf, sizeof(buf)); + if (ret < 0) { + ret = -errno; + ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl; + return ret; + } + + // pass read_fd in extra_fds for curl_multi_wait() + int num_fds; + struct curl_waitfd wait_fd; + + wait_fd.fd = read_fd; + wait_fd.events = CURL_WAIT_POLLIN; + wait_fd.revents = 0; + + ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds); + if (ret != CURLM_OK) { + ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl; + return -EIO; + } + + // curl_multi_wait should flag revents when extra_fd is readable. if it + // doesn't, the bug is present and we can't rely on revents + if (wait_fd.revents == 0) { + curl_multi_wait_bug_present = true; + ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a " + "bug in curl_multi_wait(). enabling a workaround that may degrade " + "performance slightly." << dendl; + } + + return clear_signal(read_fd); +} + +static bool is_signaled(const curl_waitfd& wait_fd) +{ + if (wait_fd.fd < 0) { + // no fd to signal + return false; + } + + if (curl_multi_wait_bug_present) { + // we can't rely on revents, so we always return true if a wait_fd is given. + // this means we'll be trying a non-blocking read on this fd every time that + // curl_multi_wait() wakes up + return true; + } + + return wait_fd.revents > 0; +} + +static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd) +{ + int num_fds; + struct curl_waitfd wait_fd; + + wait_fd.fd = signal_fd; + wait_fd.events = CURL_WAIT_POLLIN; + wait_fd.revents = 0; + + int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds); + if (ret) { + ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl; + return -EIO; + } + + if (is_signaled(wait_fd)) { + ret = clear_signal(signal_fd); + if (ret < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl; + return ret; + } + } + return 0; +} + +#else + +static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd) +{ + fd_set fdread; + fd_set fdwrite; + fd_set fdexcep; + int maxfd = -1; + + FD_ZERO(&fdread); + FD_ZERO(&fdwrite); + FD_ZERO(&fdexcep); + + /* get file descriptors from the transfers */ + int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd); + if (ret) { + ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl; + return -EIO; + } + + if (signal_fd > 0) { + FD_SET(signal_fd, &fdread); + if (signal_fd >= maxfd) { + maxfd = signal_fd + 1; + } + } + + /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */ + uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms; +#define RGW_CURL_TIMEOUT 1000 + if (!to) + to = RGW_CURL_TIMEOUT; + struct timeval timeout; + timeout.tv_sec = to / 1000; + timeout.tv_usec = to % 1000; + + ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout); + if (ret < 0) { + ret = -errno; + ldout(cct, 0) << "ERROR: select returned " << ret << dendl; + return ret; + } + + if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) { + ret = clear_signal(signal_fd); + if (ret < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl; + return ret; + } + } + + return 0; +} + +#endif + +void *RGWHTTPManager::ReqsThread::entry() +{ + manager->reqs_thread_entry(); + return NULL; +} + +/* + * RGWHTTPManager has two modes of operation: threaded and non-threaded. + */ +RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct), + completion_mgr(_cm), is_threaded(false), + reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0), + reqs_thread(NULL) +{ + multi_handle = (void *)curl_multi_init(); + thread_pipe[0] = -1; + thread_pipe[1] = -1; +} + +RGWHTTPManager::~RGWHTTPManager() { + stop(); + if (multi_handle) + curl_multi_cleanup((CURLM *)multi_handle); +} + +void RGWHTTPManager::register_request(rgw_http_req_data *req_data) +{ + RWLock::WLocker rl(reqs_lock); + req_data->id = num_reqs; + req_data->registered = true; + reqs[num_reqs] = req_data; + num_reqs++; + ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; +} + +void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data) +{ + RWLock::WLocker rl(reqs_lock); + req_data->get(); + req_data->registered = false; + unregistered_reqs.push_back(req_data); + ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; +} + +void RGWHTTPManager::complete_request(rgw_http_req_data *req_data) +{ + RWLock::WLocker rl(reqs_lock); + _complete_request(req_data); +} + +void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data) +{ + map::iterator iter = reqs.find(req_data->id); + if (iter != reqs.end()) { + reqs.erase(iter); + } + { + Mutex::Locker l(req_data->lock); + req_data->mgr = nullptr; + } + if (completion_mgr) { + completion_mgr->complete(NULL, req_data->user_info); + } + + req_data->put(); +} + +void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret) +{ + req_data->finish(ret); + complete_request(req_data); +} + +void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret) +{ + req_data->finish(ret); + _complete_request(req_data); +} + +/* + * hook request to the curl multi handle + */ +int RGWHTTPManager::link_request(rgw_http_req_data *req_data) +{ + ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; + CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle); + if (mstatus) { + dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl; + return -EIO; + } + return 0; +} + +/* + * unhook request from the curl multi handle, and finish request if it wasn't finished yet as + * there will be no more processing on this request + */ +void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data) +{ + if (req_data->easy_handle) { + curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle); + } + if (!req_data->is_done()) { + _finish_request(req_data, -ECANCELED); + } +} + +void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data) +{ + RWLock::WLocker wl(reqs_lock); + _unlink_request(req_data); +} + +void RGWHTTPManager::manage_pending_requests() +{ + reqs_lock.get_read(); + if (max_threaded_req == num_reqs && unregistered_reqs.empty()) { + reqs_lock.unlock(); + return; + } + reqs_lock.unlock(); + + RWLock::WLocker wl(reqs_lock); + + if (!unregistered_reqs.empty()) { + for (auto& r : unregistered_reqs) { + _unlink_request(r); + r->put(); + } + + unregistered_reqs.clear(); + } + + map::iterator iter = reqs.find(max_threaded_req); + + list > remove_reqs; + + for (; iter != reqs.end(); ++iter) { + rgw_http_req_data *req_data = iter->second; + int r = link_request(req_data); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to link http request" << dendl; + remove_reqs.push_back(std::make_pair(iter->second, r)); + } else { + max_threaded_req = iter->first + 1; + } + } + + for (auto piter : remove_reqs) { + rgw_http_req_data *req_data = piter.first; + int r = piter.second; + + _finish_request(req_data, r); + } +} + +int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint) +{ + rgw_http_req_data *req_data = new rgw_http_req_data; + + int ret = client->init_request(method, url, req_data, send_data_hint); + if (ret < 0) { + req_data->put(); + req_data = NULL; + return ret; + } + + req_data->mgr = this; + req_data->client = client; + req_data->user_info = client->get_user_info(); + + register_request(req_data); + + if (!is_threaded) { + ret = link_request(req_data); + if (ret < 0) { + req_data->put(); + req_data = NULL; + } + return ret; + } + ret = signal_thread(); + if (ret < 0) { + finish_request(req_data, ret); + } + + return ret; +} + +int RGWHTTPManager::remove_request(RGWHTTPClient *client) +{ + rgw_http_req_data *req_data = client->get_req_data(); + + if (!is_threaded) { + unlink_request(req_data); + return 0; + } + unregister_request(req_data); + int ret = signal_thread(); + if (ret < 0) { + return ret; + } + + return 0; +} + +/* + * the synchronous, non-threaded request processing method. + */ +int RGWHTTPManager::process_requests(bool wait_for_data, bool *done) +{ + assert(!is_threaded); + + int still_running; + int mstatus; + + do { + if (wait_for_data) { + int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1); + if (ret < 0) { + return ret; + } + } + + mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running); + switch (mstatus) { + case CURLM_OK: + case CURLM_CALL_MULTI_PERFORM: + break; + default: + dout(20) << "curl_multi_perform returned: " << mstatus << dendl; + return -EINVAL; + } + int msgs_left; + CURLMsg *msg; + while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) { + if (msg->msg == CURLMSG_DONE) { + CURL *e = msg->easy_handle; + rgw_http_req_data *req_data; + curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); + + long http_status; + curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status); + + int status = rgw_http_error_to_errno(http_status); + int result = msg->data.result; + finish_request(req_data, status); + switch (result) { + case CURLE_OK: + break; + default: + dout(20) << "ERROR: msg->data.result=" << result << dendl; + return -EIO; + } + } + } + } while (mstatus == CURLM_CALL_MULTI_PERFORM); + + *done = (still_running == 0); + + return 0; +} + +/* + * the synchronous, non-threaded request processing completion method. + */ +int RGWHTTPManager::complete_requests() +{ + bool done = false; + int ret; + do { + ret = process_requests(true, &done); + } while (!done && !ret); + + return ret; +} + +int RGWHTTPManager::set_threaded() +{ + int r = pipe(thread_pipe); + if (r < 0) { + r = -errno; + ldout(cct, 0) << "ERROR: pipe() returned errno=" << r << dendl; + return r; + } + + // enable non-blocking reads + r = ::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK); + if (r < 0) { + r = -errno; + ldout(cct, 0) << "ERROR: fcntl() returned errno=" << r << dendl; + TEMP_FAILURE_RETRY(::close(thread_pipe[0])); + TEMP_FAILURE_RETRY(::close(thread_pipe[1])); + return r; + } + +#ifdef HAVE_CURL_MULTI_WAIT + // on first initialization, use this pipe to detect whether we're using a + // buggy version of libcurl + std::call_once(detect_flag, detect_curl_multi_wait_bug, cct, + static_cast(multi_handle), + thread_pipe[1], thread_pipe[0]); +#endif + + is_threaded = true; + reqs_thread = new ReqsThread(this); + reqs_thread->create("http_manager"); + return 0; +} + +void RGWHTTPManager::stop() +{ + if (is_stopped) { + return; + } + + is_stopped = true; + + if (is_threaded) { + going_down = true; + signal_thread(); + reqs_thread->join(); + delete reqs_thread; + TEMP_FAILURE_RETRY(::close(thread_pipe[1])); + TEMP_FAILURE_RETRY(::close(thread_pipe[0])); + } +} + +int RGWHTTPManager::signal_thread() +{ + uint32_t buf = 0; + int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf)); + if (ret < 0) { + ret = -errno; + ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl; + return ret; + } + return 0; +} + +void *RGWHTTPManager::reqs_thread_entry() +{ + int still_running; + int mstatus; + + ldout(cct, 20) << __func__ << ": start" << dendl; + + while (!going_down) { + int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]); + if (ret < 0) { + dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl; + return NULL; + } + + manage_pending_requests(); + + mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running); + switch (mstatus) { + case CURLM_OK: + case CURLM_CALL_MULTI_PERFORM: + break; + default: + dout(10) << "curl_multi_perform returned: " << mstatus << dendl; + break; + } + int msgs_left; + CURLMsg *msg; + while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) { + if (msg->msg == CURLMSG_DONE) { + int result = msg->data.result; + CURL *e = msg->easy_handle; + rgw_http_req_data *req_data; + curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); + curl_multi_remove_handle((CURLM *)multi_handle, e); + + long http_status; + curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status); + + int status = rgw_http_error_to_errno(http_status); + if (result != CURLE_OK && http_status == 0) { + status = -EAGAIN; + } + int id = req_data->id; + finish_request(req_data, status); + switch (result) { + case CURLE_OK: + break; + default: + dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl; + break; + } + } + } + } + + + RWLock::WLocker rl(reqs_lock); + for (auto r : unregistered_reqs) { + _finish_request(r, -ECANCELED); + } + + unregistered_reqs.clear(); + + auto all_reqs = std::move(reqs); + for (auto iter : all_reqs) { + _finish_request(iter.second, -ECANCELED); + } + + reqs.clear(); + + if (completion_mgr) { + completion_mgr->go_down(); + } + + return 0; +} + +