1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
7 #include "rgw_common.h"
10 #include "rgw_auth_registry.h"
15 #include "include/assert.h"
17 #include "common/WorkQueue.h"
18 #include "common/Throttle.h"
22 #if !defined(dout_subsys)
23 #define dout_subsys ceph_subsys_rgw
24 #define def_dout_subsys
27 #define dout_context g_ceph_context
29 extern void signal_shutdown();
31 struct RGWProcessEnv {
36 std::string uri_prefix;
37 std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
40 class RGWFrontendConfig;
43 deque<RGWRequest*> m_req_queue;
47 rgw_auth_registry_ptr_t auth_registry;
50 Throttle req_throttle;
52 RGWFrontendConfig* conf;
54 std::string uri_prefix;
56 struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
58 RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp)
59 : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
62 bool _enqueue(RGWRequest* req) override {
63 process->m_req_queue.push_back(req);
64 perfcounter->inc(l_rgw_qlen);
65 dout(20) << "enqueued request req=" << hex << req << dec << dendl;
70 void _dequeue(RGWRequest* req) override {
74 bool _empty() override {
75 return process->m_req_queue.empty();
78 RGWRequest* _dequeue() override {
79 if (process->m_req_queue.empty())
81 RGWRequest *req = process->m_req_queue.front();
82 process->m_req_queue.pop_front();
83 dout(20) << "dequeued request req=" << hex << req << dec << dendl;
85 perfcounter->inc(l_rgw_qlen, -1);
89 using ThreadPool::WorkQueue<RGWRequest>::_process;
91 void _process(RGWRequest *req, ThreadPool::TPHandle &) override {
92 perfcounter->inc(l_rgw_qactive);
93 process->handle_request(req);
94 process->req_throttle.put(1);
95 perfcounter->inc(l_rgw_qactive, -1);
100 void _clear() override {
101 assert(process->m_req_queue.empty());
106 RGWProcess(CephContext* const cct,
107 RGWProcessEnv* const pe,
108 const int num_threads,
109 RGWFrontendConfig* const conf)
112 auth_registry(pe->auth_registry),
114 m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
115 req_throttle(cct, "rgw_ops", num_threads * 2),
119 uri_prefix(pe->uri_prefix),
120 req_wq(this, g_conf->rgw_op_thread_timeout,
121 g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
124 virtual ~RGWProcess() = default;
126 virtual void run() = 0;
127 virtual void handle_request(RGWRequest *req) = 0;
133 void unpause_with_new_config(RGWRados* const store,
134 rgw_auth_registry_ptr_t auth_registry) {
136 this->auth_registry = std::move(auth_registry);
148 class RGWFCGXProcess : public RGWProcess {
152 /* have a bit more connections than threads so that requests are
153 * still accepted even if we're still processing older requests */
154 RGWFCGXProcess(CephContext* const cct,
155 RGWProcessEnv* const pe,
156 const int num_threads,
157 RGWFrontendConfig* const conf)
158 : RGWProcess(cct, pe, num_threads, conf),
159 max_connections(num_threads + (num_threads >> 3)) {
163 void handle_request(RGWRequest* req) override;
166 class RGWProcessControlThread : public Thread {
167 RGWProcess *pprocess;
169 RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
171 void *entry() override {
177 class RGWLoadGenProcess : public RGWProcess {
178 RGWAccessKey access_key;
180 RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
181 RGWFrontendConfig* _conf) :
182 RGWProcess(cct, pe, num_threads, _conf) {}
185 void handle_request(RGWRequest* req) override;
186 void gen_request(const string& method, const string& resource,
187 int content_length, std::atomic<bool>* fail_flag);
189 void set_access_key(RGWAccessKey& key) { access_key = key; }
192 /* process stream request */
193 extern int process_request(RGWRados* store,
196 const std::string& frontend_prefix,
197 const rgw_auth_registry_t& auth_registry,
198 RGWRestfulIO* client_io,
201 extern int rgw_process_authenticated(RGWHandler_REST* handler,
205 bool skip_retarget = false);
207 #if defined(def_dout_subsys)
208 #undef def_dout_subsys
213 #endif /* RGW_PROCESS_H */