Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_process.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef RGW_PROCESS_H
5 #define RGW_PROCESS_H
6
7 #include "rgw_common.h"
8 #include "rgw_rados.h"
9 #include "rgw_acl.h"
10 #include "rgw_auth_registry.h"
11 #include "rgw_user.h"
12 #include "rgw_op.h"
13 #include "rgw_rest.h"
14
15 #include "include/assert.h"
16
17 #include "common/WorkQueue.h"
18 #include "common/Throttle.h"
19
20 #include <atomic>
21
22 #if !defined(dout_subsys)
23 #define dout_subsys ceph_subsys_rgw
24 #define def_dout_subsys
25 #endif
26
27 #define dout_context g_ceph_context
28
29 extern void signal_shutdown();
30
31 struct RGWProcessEnv {
32   RGWRados *store;
33   RGWREST *rest;
34   OpsLogSocket *olog;
35   int port;
36   std::string uri_prefix;
37   std::shared_ptr<rgw::auth::StrategyRegistry> auth_registry;
38 };
39
40 class RGWFrontendConfig;
41
42 class RGWProcess {
43   deque<RGWRequest*> m_req_queue;
44 protected:
45   CephContext *cct;
46   RGWRados* store;
47   rgw_auth_registry_ptr_t auth_registry;
48   OpsLogSocket* olog;
49   ThreadPool m_tp;
50   Throttle req_throttle;
51   RGWREST* rest;
52   RGWFrontendConfig* conf;
53   int sock_fd;
54   std::string uri_prefix;
55
56   struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
57     RGWProcess* process;
58     RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp)
59       : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
60                                           tp), process(p) {}
61
62     bool _enqueue(RGWRequest* req) override {
63       process->m_req_queue.push_back(req);
64       perfcounter->inc(l_rgw_qlen);
65       dout(20) << "enqueued request req=" << hex << req << dec << dendl;
66       _dump_queue();
67       return true;
68     }
69
70     void _dequeue(RGWRequest* req) override {
71       ceph_abort();
72     }
73
74     bool _empty() override {
75       return process->m_req_queue.empty();
76     }
77
78     RGWRequest* _dequeue() override {
79       if (process->m_req_queue.empty())
80         return NULL;
81       RGWRequest *req = process->m_req_queue.front();
82       process->m_req_queue.pop_front();
83       dout(20) << "dequeued request req=" << hex << req << dec << dendl;
84       _dump_queue();
85       perfcounter->inc(l_rgw_qlen, -1);
86       return req;
87     }
88
89     using ThreadPool::WorkQueue<RGWRequest>::_process;
90
91     void _process(RGWRequest *req, ThreadPool::TPHandle &) override  {
92       perfcounter->inc(l_rgw_qactive);
93       process->handle_request(req);
94       process->req_throttle.put(1);
95       perfcounter->inc(l_rgw_qactive, -1);
96     }
97
98     void _dump_queue();
99
100     void _clear() override {
101       assert(process->m_req_queue.empty());
102     }
103   } req_wq;
104
105 public:
106   RGWProcess(CephContext* const cct,
107              RGWProcessEnv* const pe,
108              const int num_threads,
109              RGWFrontendConfig* const conf)
110     : cct(cct),
111       store(pe->store),
112       auth_registry(pe->auth_registry),
113       olog(pe->olog),
114       m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
115       req_throttle(cct, "rgw_ops", num_threads * 2),
116       rest(pe->rest),
117       conf(conf),
118       sock_fd(-1),
119       uri_prefix(pe->uri_prefix),
120       req_wq(this, g_conf->rgw_op_thread_timeout,
121              g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
122   }
123   
124   virtual ~RGWProcess() = default;
125
126   virtual void run() = 0;
127   virtual void handle_request(RGWRequest *req) = 0;
128
129   void pause() {
130     m_tp.pause();
131   }
132
133   void unpause_with_new_config(RGWRados* const store,
134                                rgw_auth_registry_ptr_t auth_registry) {
135     this->store = store;
136     this->auth_registry = std::move(auth_registry);
137     m_tp.unpause();
138   }
139
140   void close_fd() {
141     if (sock_fd >= 0) {
142       ::close(sock_fd);
143       sock_fd = -1;
144     }
145   }
146 }; /* RGWProcess */
147
148 class RGWFCGXProcess : public RGWProcess {
149   int max_connections;
150 public:
151
152   /* have a bit more connections than threads so that requests are
153    * still accepted even if we're still processing older requests */
154   RGWFCGXProcess(CephContext* const cct,
155                  RGWProcessEnv* const pe,
156                  const int num_threads,
157                  RGWFrontendConfig* const conf)
158     : RGWProcess(cct, pe, num_threads, conf),
159       max_connections(num_threads + (num_threads >> 3)) {
160   }
161
162   void run() override;
163   void handle_request(RGWRequest* req) override;
164 };
165
166 class RGWProcessControlThread : public Thread {
167   RGWProcess *pprocess;
168 public:
169   RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
170
171   void *entry() override {
172     pprocess->run();
173     return NULL;
174   }
175 };
176
177 class RGWLoadGenProcess : public RGWProcess {
178   RGWAccessKey access_key;
179 public:
180   RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
181                   RGWFrontendConfig* _conf) :
182   RGWProcess(cct, pe, num_threads, _conf) {}
183   void run() override;
184   void checkpoint();
185   void handle_request(RGWRequest* req) override;
186   void gen_request(const string& method, const string& resource,
187                   int content_length, std::atomic<bool>* fail_flag);
188
189   void set_access_key(RGWAccessKey& key) { access_key = key; }
190 };
191
192 /* process stream request */
193 extern int process_request(RGWRados* store,
194                            RGWREST* rest,
195                            RGWRequest* req,
196                            const std::string& frontend_prefix,
197                            const rgw_auth_registry_t& auth_registry,
198                            RGWRestfulIO* client_io,
199                            OpsLogSocket* olog);
200
201 extern int rgw_process_authenticated(RGWHandler_REST* handler,
202                                      RGWOp*& op,
203                                      RGWRequest* req,
204                                      req_state* s,
205                                      bool skip_retarget = false);
206
207 #if defined(def_dout_subsys)
208 #undef def_dout_subsys
209 #undef dout_subsys
210 #endif
211 #undef dout_context
212
213 #endif /* RGW_PROCESS_H */