// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #ifndef RGW_PROCESS_H #define RGW_PROCESS_H #include "rgw_common.h" #include "rgw_rados.h" #include "rgw_acl.h" #include "rgw_auth_registry.h" #include "rgw_user.h" #include "rgw_op.h" #include "rgw_rest.h" #include "include/assert.h" #include "common/WorkQueue.h" #include "common/Throttle.h" #include #if !defined(dout_subsys) #define dout_subsys ceph_subsys_rgw #define def_dout_subsys #endif #define dout_context g_ceph_context extern void signal_shutdown(); struct RGWProcessEnv { RGWRados *store; RGWREST *rest; OpsLogSocket *olog; int port; std::string uri_prefix; std::shared_ptr auth_registry; }; class RGWFrontendConfig; class RGWProcess { deque m_req_queue; protected: CephContext *cct; RGWRados* store; rgw_auth_registry_ptr_t auth_registry; OpsLogSocket* olog; ThreadPool m_tp; Throttle req_throttle; RGWREST* rest; RGWFrontendConfig* conf; int sock_fd; std::string uri_prefix; struct RGWWQ : public ThreadPool::WorkQueue { RGWProcess* process; RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp) : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), process(p) {} bool _enqueue(RGWRequest* req) override { process->m_req_queue.push_back(req); perfcounter->inc(l_rgw_qlen); dout(20) << "enqueued request req=" << hex << req << dec << dendl; _dump_queue(); return true; } void _dequeue(RGWRequest* req) override { ceph_abort(); } bool _empty() override { return process->m_req_queue.empty(); } RGWRequest* _dequeue() override { if (process->m_req_queue.empty()) return NULL; RGWRequest *req = process->m_req_queue.front(); process->m_req_queue.pop_front(); dout(20) << "dequeued request req=" << hex << req << dec << dendl; _dump_queue(); perfcounter->inc(l_rgw_qlen, -1); return req; } using ThreadPool::WorkQueue::_process; void _process(RGWRequest *req, ThreadPool::TPHandle &) override { perfcounter->inc(l_rgw_qactive); process->handle_request(req); process->req_throttle.put(1); perfcounter->inc(l_rgw_qactive, -1); } void _dump_queue(); void _clear() override { assert(process->m_req_queue.empty()); } } req_wq; public: RGWProcess(CephContext* const cct, RGWProcessEnv* const pe, const int num_threads, RGWFrontendConfig* const conf) : cct(cct), store(pe->store), auth_registry(pe->auth_registry), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads), req_throttle(cct, "rgw_ops", num_threads * 2), rest(pe->rest), conf(conf), sock_fd(-1), uri_prefix(pe->uri_prefix), req_wq(this, g_conf->rgw_op_thread_timeout, g_conf->rgw_op_thread_suicide_timeout, &m_tp) { } virtual ~RGWProcess() = default; virtual void run() = 0; virtual void handle_request(RGWRequest *req) = 0; void pause() { m_tp.pause(); } void unpause_with_new_config(RGWRados* const store, rgw_auth_registry_ptr_t auth_registry) { this->store = store; this->auth_registry = std::move(auth_registry); m_tp.unpause(); } void close_fd() { if (sock_fd >= 0) { ::close(sock_fd); sock_fd = -1; } } }; /* RGWProcess */ class RGWFCGXProcess : public RGWProcess { int max_connections; public: /* have a bit more connections than threads so that requests are * still accepted even if we're still processing older requests */ RGWFCGXProcess(CephContext* const cct, RGWProcessEnv* const pe, const int num_threads, RGWFrontendConfig* const conf) : RGWProcess(cct, pe, num_threads, conf), max_connections(num_threads + (num_threads >> 3)) { } void run() override; void handle_request(RGWRequest* req) override; }; class RGWProcessControlThread : public Thread { RGWProcess *pprocess; public: RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {} void *entry() override { pprocess->run(); return NULL; } }; class RGWLoadGenProcess : public RGWProcess { RGWAccessKey access_key; public: RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads, RGWFrontendConfig* _conf) : RGWProcess(cct, pe, num_threads, _conf) {} void run() override; void checkpoint(); void handle_request(RGWRequest* req) override; void gen_request(const string& method, const string& resource, int content_length, std::atomic* fail_flag); void set_access_key(RGWAccessKey& key) { access_key = key; } }; /* process stream request */ extern int process_request(RGWRados* store, RGWREST* rest, RGWRequest* req, const std::string& frontend_prefix, const rgw_auth_registry_t& auth_registry, RGWRestfulIO* client_io, OpsLogSocket* olog); extern int rgw_process_authenticated(RGWHandler_REST* handler, RGWOp*& op, RGWRequest* req, req_state* s, bool skip_retarget = false); #if defined(def_dout_subsys) #undef def_dout_subsys #undef dout_subsys #endif #undef dout_context #endif /* RGW_PROCESS_H */