Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_frontend.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef RGW_FRONTEND_H
5 #define RGW_FRONTEND_H
6
7 #include <map>
8 #include <string>
9
10 #include "rgw_request.h"
11 #include "rgw_process.h"
12 #include "rgw_realm_reloader.h"
13
14 #include "rgw_civetweb.h"
15 #include "rgw_civetweb_log.h"
16 #include "civetweb/civetweb.h"
17
18 #include "rgw_auth_registry.h"
19
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rgw
22
23 class RGWFrontendConfig {
24   std::string config;
25   std::map<std::string, std::string> config_map;
26   std::string framework;
27
28   int parse_config(const std::string& config,
29                    std::map<std::string, std::string>& config_map);
30
31 public:
32   RGWFrontendConfig(const std::string& config)
33     : config(config) {
34   }
35
36   int init() {
37     const int ret = parse_config(config, config_map);
38     return ret < 0 ? ret : 0;
39   }
40
41   bool get_val(const std::string& key,
42                const std::string& def_val,
43                std::string* out);
44   bool get_val(const std::string& key, int def_val, int *out);
45
46   std::string get_val(const std::string& key,
47                       const std::string& def_val) {
48     std::string out;
49     get_val(key, def_val, &out);
50     return out;
51   }
52
53   const std::string& get_config() {
54     return config;
55   }
56
57   std::map<std::string, std::string>& get_config_map() {
58     return config_map;
59   }
60
61   std::string get_framework() const {
62     return framework;
63  }
64 };
65
66 class RGWFrontend {
67 public:
68   virtual ~RGWFrontend() {}
69
70   virtual int init() = 0;
71
72   virtual int run() = 0;
73   virtual void stop() = 0;
74   virtual void join() = 0;
75
76   virtual void pause_for_new_config() = 0;
77   virtual void unpause_with_new_config(RGWRados* store,
78                                        rgw_auth_registry_ptr_t auth_registry) = 0;
79 };
80
81
82 struct RGWMongooseEnv : public RGWProcessEnv {
83   // every request holds a read lock, so we need to prioritize write locks to
84   // avoid starving pause_for_new_config()
85   static constexpr bool prioritize_write = true;
86   RWLock mutex;
87
88   RGWMongooseEnv(const RGWProcessEnv &env)
89     : RGWProcessEnv(env),
90       mutex("RGWCivetWebFrontend", false, true, prioritize_write) {
91   }
92 };
93
94
95 class RGWCivetWebFrontend : public RGWFrontend {
96   RGWFrontendConfig* conf;
97   struct mg_context* ctx;
98   RGWMongooseEnv env;
99
100   void set_conf_default(std::map<std::string, std::string>& m,
101                         const std::string& key,
102                         const std::string& def_val) {
103     if (m.find(key) == std::end(m)) {
104       m[key] = def_val;
105     }
106   }
107
108 public:
109   RGWCivetWebFrontend(RGWProcessEnv& env,
110                       RGWFrontendConfig* conf)
111     : conf(conf),
112       ctx(nullptr),
113       env(env) {
114   }
115
116   int init() override {
117     return 0;
118   }
119
120   int run() override;
121
122   int process(struct mg_connection* conn);
123
124   void stop() override {
125     if (ctx) {
126       mg_stop(ctx);
127     }
128   }
129
130   void join() override {
131     return;
132   }
133
134   void pause_for_new_config() override {
135     // block callbacks until unpause
136     env.mutex.get_write();
137   }
138
139   void unpause_with_new_config(RGWRados* const store,
140                                rgw_auth_registry_ptr_t auth_registry) override {
141     env.store = store;
142     env.auth_registry = std::move(auth_registry);
143     // unpause callbacks
144     env.mutex.put_write();
145   }
146 }; /* RGWCivetWebFrontend */
147
148 class RGWProcessFrontend : public RGWFrontend {
149 protected:
150   RGWFrontendConfig* conf;
151   RGWProcess* pprocess;
152   RGWProcessEnv env;
153   RGWProcessControlThread* thread;
154
155 public:
156   RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig* _conf)
157     : conf(_conf), pprocess(nullptr), env(pe), thread(nullptr) {
158   }
159
160   ~RGWProcessFrontend() override {
161     delete thread;
162     delete pprocess;
163   }
164
165   int run() override {
166     assert(pprocess); /* should have initialized by init() */
167     thread = new RGWProcessControlThread(pprocess);
168     thread->create("rgw_frontend");
169     return 0;
170   }
171
172   void stop() override;
173
174   void join() override {
175     thread->join();
176   }
177
178   void pause_for_new_config() override {
179     pprocess->pause();
180   }
181
182   void unpause_with_new_config(RGWRados* const store,
183                                rgw_auth_registry_ptr_t auth_registry) override {
184     env.store = store;
185     env.auth_registry = auth_registry;
186     pprocess->unpause_with_new_config(store, std::move(auth_registry));
187   }
188 }; /* RGWProcessFrontend */
189
190 class RGWFCGXFrontend : public RGWProcessFrontend {
191 public:
192   RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig* _conf)
193     : RGWProcessFrontend(pe, _conf) {}
194
195   int init() override {
196     pprocess = new RGWFCGXProcess(g_ceph_context, &env,
197                                   g_conf->rgw_thread_pool_size, conf);
198     return 0;
199   }
200 }; /* RGWFCGXFrontend */
201
202 class RGWLoadGenFrontend : public RGWProcessFrontend {
203 public:
204   RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf)
205     : RGWProcessFrontend(pe, _conf) {}
206
207   int init() override {
208     int num_threads;
209     conf->get_val("num_threads", g_conf->rgw_thread_pool_size, &num_threads);
210     RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env,
211                                                   num_threads, conf);
212
213     pprocess = pp;
214
215     string uid_str;
216     conf->get_val("uid", "", &uid_str);
217     if (uid_str.empty()) {
218       derr << "ERROR: uid param must be specified for loadgen frontend"
219            << dendl;
220       return -EINVAL;
221     }
222
223     rgw_user uid(uid_str);
224
225     RGWUserInfo user_info;
226     int ret = rgw_get_user_info_by_uid(env.store, uid, user_info, NULL);
227     if (ret < 0) {
228       derr << "ERROR: failed reading user info: uid=" << uid << " ret="
229            << ret << dendl;
230       return ret;
231     }
232
233     map<string, RGWAccessKey>::iterator aiter = user_info.access_keys.begin();
234     if (aiter == user_info.access_keys.end()) {
235       derr << "ERROR: user has no S3 access keys set" << dendl;
236       return -EINVAL;
237     }
238
239     pp->set_access_key(aiter->second);
240
241     return 0;
242   }
243 }; /* RGWLoadGenFrontend */
244
245 // FrontendPauser implementation for RGWRealmReloader
246 class RGWFrontendPauser : public RGWRealmReloader::Pauser {
247   std::list<RGWFrontend*> &frontends;
248   RGWRealmReloader::Pauser* pauser;
249
250  public:
251   RGWFrontendPauser(std::list<RGWFrontend*> &frontends,
252                     RGWRealmReloader::Pauser* pauser = nullptr)
253     : frontends(frontends), pauser(pauser) {}
254
255   void pause() override {
256     for (auto frontend : frontends)
257       frontend->pause_for_new_config();
258     if (pauser)
259       pauser->pause();
260   }
261   void resume(RGWRados *store) override {
262     /* Initialize the registry of auth strategies which will coordinate
263      * the dynamic reconfiguration. */
264     auto auth_registry = \
265       rgw::auth::StrategyRegistry::create(g_ceph_context, store);
266
267     for (auto frontend : frontends)
268       frontend->unpause_with_new_config(store, auth_registry);
269     if (pauser)
270       pauser->resume(store);
271   }
272 };
273
274 #endif /* RGW_FRONTEND_H */