X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_process.h;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_process.h;h=3aaeaff61afb818664a5f8f2878485d3927d61a6;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_process.h b/src/ceph/src/rgw/rgw_process.h new file mode 100644 index 0000000..3aaeaff --- /dev/null +++ b/src/ceph/src/rgw/rgw_process.h @@ -0,0 +1,213 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RGW_PROCESS_H +#define RGW_PROCESS_H + +#include "rgw_common.h" +#include "rgw_rados.h" +#include "rgw_acl.h" +#include "rgw_auth_registry.h" +#include "rgw_user.h" +#include "rgw_op.h" +#include "rgw_rest.h" + +#include "include/assert.h" + +#include "common/WorkQueue.h" +#include "common/Throttle.h" + +#include + +#if !defined(dout_subsys) +#define dout_subsys ceph_subsys_rgw +#define def_dout_subsys +#endif + +#define dout_context g_ceph_context + +extern void signal_shutdown(); + +struct RGWProcessEnv { + RGWRados *store; + RGWREST *rest; + OpsLogSocket *olog; + int port; + std::string uri_prefix; + std::shared_ptr auth_registry; +}; + +class RGWFrontendConfig; + +class RGWProcess { + deque m_req_queue; +protected: + CephContext *cct; + RGWRados* store; + rgw_auth_registry_ptr_t auth_registry; + OpsLogSocket* olog; + ThreadPool m_tp; + Throttle req_throttle; + RGWREST* rest; + RGWFrontendConfig* conf; + int sock_fd; + std::string uri_prefix; + + struct RGWWQ : public ThreadPool::WorkQueue { + RGWProcess* process; + RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp) + : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, + tp), process(p) {} + + bool _enqueue(RGWRequest* req) override { + process->m_req_queue.push_back(req); + perfcounter->inc(l_rgw_qlen); + dout(20) << "enqueued request req=" << hex << req << dec << dendl; + _dump_queue(); + return true; + } + + void _dequeue(RGWRequest* req) override { + ceph_abort(); + } + + bool _empty() override { + return process->m_req_queue.empty(); + } + + RGWRequest* _dequeue() override { + if (process->m_req_queue.empty()) + return NULL; + RGWRequest *req = process->m_req_queue.front(); + process->m_req_queue.pop_front(); + dout(20) << "dequeued request req=" << hex << req << dec << dendl; + _dump_queue(); + perfcounter->inc(l_rgw_qlen, -1); + return req; + } + + using ThreadPool::WorkQueue::_process; + + void _process(RGWRequest *req, ThreadPool::TPHandle &) override { + perfcounter->inc(l_rgw_qactive); + process->handle_request(req); + process->req_throttle.put(1); + perfcounter->inc(l_rgw_qactive, -1); + } + + void _dump_queue(); + + void _clear() override { + assert(process->m_req_queue.empty()); + } + } req_wq; + +public: + RGWProcess(CephContext* const cct, + RGWProcessEnv* const pe, + const int num_threads, + RGWFrontendConfig* const conf) + : cct(cct), + store(pe->store), + auth_registry(pe->auth_registry), + olog(pe->olog), + m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads), + req_throttle(cct, "rgw_ops", num_threads * 2), + rest(pe->rest), + conf(conf), + sock_fd(-1), + uri_prefix(pe->uri_prefix), + req_wq(this, g_conf->rgw_op_thread_timeout, + g_conf->rgw_op_thread_suicide_timeout, &m_tp) { + } + + virtual ~RGWProcess() = default; + + virtual void run() = 0; + virtual void handle_request(RGWRequest *req) = 0; + + void pause() { + m_tp.pause(); + } + + void unpause_with_new_config(RGWRados* const store, + rgw_auth_registry_ptr_t auth_registry) { + this->store = store; + this->auth_registry = std::move(auth_registry); + m_tp.unpause(); + } + + void close_fd() { + if (sock_fd >= 0) { + ::close(sock_fd); + sock_fd = -1; + } + } +}; /* RGWProcess */ + +class RGWFCGXProcess : public RGWProcess { + int max_connections; +public: + + /* have a bit more connections than threads so that requests are + * still accepted even if we're still processing older requests */ + RGWFCGXProcess(CephContext* const cct, + RGWProcessEnv* const pe, + const int num_threads, + RGWFrontendConfig* const conf) + : RGWProcess(cct, pe, num_threads, conf), + max_connections(num_threads + (num_threads >> 3)) { + } + + void run() override; + void handle_request(RGWRequest* req) override; +}; + +class RGWProcessControlThread : public Thread { + RGWProcess *pprocess; +public: + RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {} + + void *entry() override { + pprocess->run(); + return NULL; + } +}; + +class RGWLoadGenProcess : public RGWProcess { + RGWAccessKey access_key; +public: + RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads, + RGWFrontendConfig* _conf) : + RGWProcess(cct, pe, num_threads, _conf) {} + void run() override; + void checkpoint(); + void handle_request(RGWRequest* req) override; + void gen_request(const string& method, const string& resource, + int content_length, std::atomic* fail_flag); + + void set_access_key(RGWAccessKey& key) { access_key = key; } +}; + +/* process stream request */ +extern int process_request(RGWRados* store, + RGWREST* rest, + RGWRequest* req, + const std::string& frontend_prefix, + const rgw_auth_registry_t& auth_registry, + RGWRestfulIO* client_io, + OpsLogSocket* olog); + +extern int rgw_process_authenticated(RGWHandler_REST* handler, + RGWOp*& op, + RGWRequest* req, + req_state* s, + bool skip_retarget = false); + +#if defined(def_dout_subsys) +#undef def_dout_subsys +#undef dout_subsys +#endif +#undef dout_context + +#endif /* RGW_PROCESS_H */