X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_http_client.h;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_http_client.h;h=04ffb80c90e6afd13ba4989d13a014c73c62860d;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_http_client.h b/src/ceph/src/rgw/rgw_http_client.h new file mode 100644 index 0000000..04ffb80 --- /dev/null +++ b/src/ceph/src/rgw/rgw_http_client.h @@ -0,0 +1,278 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RGW_HTTP_CLIENT_H +#define CEPH_RGW_HTTP_CLIENT_H + +#include "common/RWLock.h" +#include "common/Cond.h" +#include "rgw_common.h" +#include "rgw_string.h" + +#include + +using param_pair_t = pair; +using param_vec_t = vector; + +struct rgw_http_req_data; + +class RGWHTTPClient +{ + friend class RGWHTTPManager; + + bufferlist send_bl; + bufferlist::iterator send_iter; + size_t send_len; + bool has_send_len; + long http_status; + + rgw_http_req_data *req_data; + + void *user_info; + + string last_method; + string last_url; + bool verify_ssl; // Do not validate self signed certificates, default to false + + std::atomic stopped { 0 }; + +protected: + CephContext *cct; + param_vec_t headers; + + int init_request(const char *method, + const char *url, + rgw_http_req_data *req_data, + bool send_data_hint = false); + + virtual int receive_header(void *ptr, size_t len) { + return 0; + } + virtual int receive_data(void *ptr, size_t len) { + return 0; + } + virtual int send_data(void *ptr, size_t len) { + return 0; + } + + /* Callbacks for libcurl. */ + static size_t simple_receive_http_header(void *ptr, + size_t size, + size_t nmemb, + void *_info); + static size_t receive_http_header(void *ptr, + size_t size, + size_t nmemb, + void *_info); + + static size_t simple_receive_http_data(void *ptr, + size_t size, + size_t nmemb, + void *_info); + static size_t receive_http_data(void *ptr, + size_t size, + size_t nmemb, + void *_info); + + static size_t simple_send_http_data(void *ptr, + size_t size, + size_t nmemb, + void *_info); + static size_t send_http_data(void *ptr, + size_t size, + size_t nmemb, + void *_info); +public: + static const long HTTP_STATUS_NOSTATUS = 0; + static const long HTTP_STATUS_UNAUTHORIZED = 401; + static const long HTTP_STATUS_NOTFOUND = 404; + + virtual ~RGWHTTPClient(); + explicit RGWHTTPClient(CephContext *cct) + : send_len(0), + has_send_len(false), + http_status(HTTP_STATUS_NOSTATUS), + req_data(nullptr), + user_info(nullptr), + verify_ssl(cct->_conf->rgw_verify_ssl), + cct(cct) { + } + + void set_user_info(void *info) { + user_info = info; + } + + void *get_user_info() { + return user_info; + } + + void append_header(const string& name, const string& val) { + headers.push_back(pair(name, val)); + } + + void set_send_length(size_t len) { + send_len = len; + has_send_len = true; + } + + + long get_http_status() const { + return http_status; + } + + void set_verify_ssl(bool flag) { + verify_ssl = flag; + } + + int process(const char *method, const char *url); + int process(const char *url) { return process("GET", url); } + + int wait(); + rgw_http_req_data *get_req_data() { return req_data; } + + string to_str(); + + int get_req_retcode(); +}; + + +class RGWHTTPHeadersCollector : public RGWHTTPClient { +public: + typedef std::string header_name_t; + typedef std::string header_value_t; + typedef std::set header_spec_t; + + RGWHTTPHeadersCollector(CephContext * const cct, + const header_spec_t relevant_headers) + : RGWHTTPClient(cct), + relevant_headers(relevant_headers) { + } + + std::map get_headers() const { + return found_headers; + } + + /* Throws std::out_of_range */ + const header_value_t& get_header_value(const header_name_t& name) const { + return found_headers.at(name); + } + +protected: + int receive_header(void *ptr, size_t len) override; + + int receive_data(void *ptr, size_t len) override { + return 0; + } + + int send_data(void *ptr, size_t len) override { + return 0; + } + +private: + const std::set relevant_headers; + std::map found_headers; +}; + + +class RGWHTTPTransceiver : public RGWHTTPHeadersCollector { + bufferlist * const read_bl; + std::string post_data; + size_t post_data_index; + +public: + RGWHTTPTransceiver(CephContext * const cct, + bufferlist * const read_bl, + const header_spec_t intercept_headers = {}) + : RGWHTTPHeadersCollector(cct, intercept_headers), + read_bl(read_bl), + post_data_index(0) { + } + + RGWHTTPTransceiver(CephContext * const cct, + bufferlist * const read_bl, + const bool verify_ssl, + const header_spec_t intercept_headers = {}) + : RGWHTTPHeadersCollector(cct, intercept_headers), + read_bl(read_bl), + post_data_index(0) { + set_verify_ssl(verify_ssl); + } + + void set_post_data(const std::string& _post_data) { + this->post_data = _post_data; + } + +protected: + int send_data(void* ptr, size_t len) override; + + int receive_data(void *ptr, size_t len) override { + read_bl->append((char *)ptr, len); + return 0; + } +}; + +typedef RGWHTTPTransceiver RGWPostHTTPData; + + +class RGWCompletionManager; + +class RGWHTTPManager { + CephContext *cct; + RGWCompletionManager *completion_mgr; + void *multi_handle; + bool is_threaded; + std::atomic going_down { 0 }; + std::atomic is_stopped { 0 }; + + RWLock reqs_lock; + map reqs; + list unregistered_reqs; + map complete_reqs; + int64_t num_reqs; + int64_t max_threaded_req; + int thread_pipe[2]; + + void register_request(rgw_http_req_data *req_data); + void complete_request(rgw_http_req_data *req_data); + void _complete_request(rgw_http_req_data *req_data); + void unregister_request(rgw_http_req_data *req_data); + void _unlink_request(rgw_http_req_data *req_data); + void unlink_request(rgw_http_req_data *req_data); + void finish_request(rgw_http_req_data *req_data, int r); + void _finish_request(rgw_http_req_data *req_data, int r); + int link_request(rgw_http_req_data *req_data); + + void manage_pending_requests(); + + class ReqsThread : public Thread { + RGWHTTPManager *manager; + + public: + ReqsThread(RGWHTTPManager *_m) : manager(_m) {} + void *entry() override; + }; + + ReqsThread *reqs_thread; + + void *reqs_thread_entry(); + + int signal_thread(); + +public: + RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL); + ~RGWHTTPManager(); + + int set_threaded(); + void stop(); + + int add_request(RGWHTTPClient *client, const char *method, const char *url, + bool send_data_hint = false); + int remove_request(RGWHTTPClient *client); + + /* only for non threaded case */ + int process_requests(bool wait_for_data, bool *done); + + int complete_requests(); +}; + +#endif