Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_sync_module_es_rest.cc
1 #include "rgw_sync_module_es.h"
2 #include "rgw_sync_module_es_rest.h"
3 #include "rgw_es_query.h"
4 #include "rgw_op.h"
5 #include "rgw_rest.h"
6 #include "rgw_rest_s3.h"
7
8 #define dout_context g_ceph_context
9 #define dout_subsys ceph_subsys_rgw
10
11 struct es_index_obj_response {
12   string bucket;
13   rgw_obj_key key;
14   uint64_t versioned_epoch{0};
15   ACLOwner owner;
16   set<string> read_permissions;
17
18   struct {
19     uint64_t size{0};
20     ceph::real_time mtime;
21     string etag;
22     string content_type;
23     map<string, string> custom_str;
24     map<string, int64_t> custom_int;
25     map<string, string> custom_date;
26
27     template <class T>
28     struct _custom_entry {
29       string name;
30       T value;
31       void decode_json(JSONObj *obj) {
32         JSONDecoder::decode_json("name", name, obj);
33         JSONDecoder::decode_json("value", value, obj);
34       }
35     };
36
37     void decode_json(JSONObj *obj) {
38       JSONDecoder::decode_json("size", size, obj);
39       string mtime_str;
40       JSONDecoder::decode_json("mtime", mtime_str, obj);
41       parse_time(mtime_str.c_str(), &mtime);
42       JSONDecoder::decode_json("etag", etag, obj);
43       JSONDecoder::decode_json("content_type", content_type, obj);
44       list<_custom_entry<string> > str_entries;
45       JSONDecoder::decode_json("custom-string", str_entries, obj);
46       for (auto& e : str_entries) {
47         custom_str[e.name] = e.value;
48       }
49       list<_custom_entry<int64_t> > int_entries;
50       JSONDecoder::decode_json("custom-int", int_entries, obj);
51       for (auto& e : int_entries) {
52         custom_int[e.name] = e.value;
53       }
54       list<_custom_entry<string> > date_entries;
55       JSONDecoder::decode_json("custom-date", date_entries, obj);
56       for (auto& e : date_entries) {
57         custom_date[e.name] = e.value;
58       }
59     }
60   } meta;
61
62   void decode_json(JSONObj *obj) {
63     JSONDecoder::decode_json("bucket", bucket, obj);
64     JSONDecoder::decode_json("name", key.name, obj);
65     JSONDecoder::decode_json("instance", key.instance, obj);
66     JSONDecoder::decode_json("versioned_epoch", versioned_epoch, obj);
67     JSONDecoder::decode_json("permissions", read_permissions, obj);
68     JSONDecoder::decode_json("owner", owner, obj);
69     JSONDecoder::decode_json("meta", meta, obj);
70   }
71 };
72
73 struct es_search_response {
74   uint32_t took;
75   bool timed_out;
76   struct {
77     uint32_t total;
78     uint32_t successful;
79     uint32_t failed;
80     void decode_json(JSONObj *obj) {
81       JSONDecoder::decode_json("total", total, obj);
82       JSONDecoder::decode_json("successful", successful, obj);
83       JSONDecoder::decode_json("failed", failed, obj);
84     }
85   } shards;
86   struct obj_hit {
87     string index;
88     string type;
89     string id;
90     // double score
91     es_index_obj_response source;
92     void decode_json(JSONObj *obj) {
93       JSONDecoder::decode_json("_index", index, obj);
94       JSONDecoder::decode_json("_type", type, obj);
95       JSONDecoder::decode_json("_id", id, obj);
96       JSONDecoder::decode_json("_source", source, obj);
97     }
98   };
99   struct {
100     uint32_t total;
101     // double max_score;
102     list<obj_hit> hits;
103     void decode_json(JSONObj *obj) {
104       JSONDecoder::decode_json("total", total, obj);
105       // JSONDecoder::decode_json("max_score", max_score, obj);
106       JSONDecoder::decode_json("hits", hits, obj);
107     }
108   } hits;
109   void decode_json(JSONObj *obj) {
110     JSONDecoder::decode_json("took", took, obj);
111     JSONDecoder::decode_json("timed_out", timed_out, obj);
112     JSONDecoder::decode_json("_shards", shards, obj);
113     JSONDecoder::decode_json("hits", hits, obj);
114   }
115 };
116
117 class RGWMetadataSearchOp : public RGWOp {
118   RGWSyncModuleInstanceRef sync_module_ref;
119   RGWElasticSyncModuleInstance *es_module;
120 protected:
121   string expression;
122   string custom_prefix;
123 #define MAX_KEYS_DEFAULT 100
124   uint64_t max_keys{MAX_KEYS_DEFAULT};
125   string marker_str;
126   uint64_t marker{0};
127   string next_marker;
128   bool is_truncated{false};
129   string err;
130
131   es_search_response response;
132
133 public:
134   RGWMetadataSearchOp(const RGWSyncModuleInstanceRef& sync_module) : sync_module_ref(sync_module) {
135     es_module = static_cast<RGWElasticSyncModuleInstance *>(sync_module_ref.get());
136   }
137
138   int verify_permission() {
139     return 0;
140   }
141   virtual int get_params() = 0;
142   void pre_exec();
143   void execute();
144
145   virtual void send_response() = 0;
146   virtual const string name() { return "metadata_search"; }
147   virtual RGWOpType get_type() { return RGW_OP_METADATA_SEARCH; }
148   virtual uint32_t op_mask() { return RGW_OP_TYPE_READ; }
149 };
150
151 void RGWMetadataSearchOp::pre_exec()
152 {
153   rgw_bucket_object_pre_exec(s);
154 }
155
156 void RGWMetadataSearchOp::execute()
157 {
158   op_ret = get_params();
159   if (op_ret < 0)
160     return;
161
162   list<pair<string, string> > conds;
163
164   if (!s->user->system) {
165     conds.push_back(make_pair("permissions", s->user->user_id.to_str()));
166   }
167
168   if (!s->bucket_name.empty()) {
169     conds.push_back(make_pair("bucket", s->bucket_name));
170   }
171
172   ESQueryCompiler es_query(expression, &conds, custom_prefix);
173   
174   static map<string, string, ltstr_nocase> aliases = {
175                                   { "bucket", "bucket" }, /* forces lowercase */
176                                   { "name", "name" },
177                                   { "key", "name" },
178                                   { "instance", "instance" },
179                                   { "etag", "meta.etag" },
180                                   { "size", "meta.size" },
181                                   { "mtime", "meta.mtime" },
182                                   { "lastmodified", "meta.mtime" },
183                                   { "contenttype", "meta.contenttype" },
184   };
185   es_query.set_field_aliases(&aliases);
186
187   static map<string, ESEntityTypeMap::EntityType> generic_map = { {"bucket", ESEntityTypeMap::ES_ENTITY_STR},
188                                                            {"name", ESEntityTypeMap::ES_ENTITY_STR},
189                                                            {"instance", ESEntityTypeMap::ES_ENTITY_STR},
190                                                            {"permissions", ESEntityTypeMap::ES_ENTITY_STR},
191                                                            {"meta.etag", ESEntityTypeMap::ES_ENTITY_STR},
192                                                            {"meta.contenttype", ESEntityTypeMap::ES_ENTITY_STR},
193                                                            {"meta.mtime", ESEntityTypeMap::ES_ENTITY_DATE},
194                                                            {"meta.size", ESEntityTypeMap::ES_ENTITY_INT} };
195   ESEntityTypeMap gm(generic_map);
196   es_query.set_generic_type_map(&gm);
197
198   static set<string> restricted_fields = { {"permissions"} };
199   es_query.set_restricted_fields(&restricted_fields);
200
201   map<string, ESEntityTypeMap::EntityType> custom_map;
202   for (auto& i : s->bucket_info.mdsearch_config) {
203     custom_map[i.first] = (ESEntityTypeMap::EntityType)i.second;
204   }
205
206   ESEntityTypeMap em(custom_map);
207   es_query.set_custom_type_map(&em);
208
209   bool valid = es_query.compile(&err);
210   if (!valid) {
211     ldout(s->cct, 10) << "invalid query, failed generating request json" << dendl;
212     op_ret = -EINVAL;
213     return;
214   }
215
216   JSONFormatter f;
217   encode_json("root", es_query, &f);
218
219   RGWRESTConn *conn = es_module->get_rest_conn();
220
221   bufferlist in;
222   bufferlist out;
223
224   stringstream ss;
225
226   f.flush(ss);
227   in.append(ss.str());
228
229   string resource = es_module->get_index_path() + "/_search";
230   param_vec_t params;
231   static constexpr int BUFSIZE = 32;
232   char buf[BUFSIZE];
233   snprintf(buf, sizeof(buf), "%lld", (long long)max_keys);
234   params.push_back(param_pair_t("size", buf));
235   if (marker > 0) {
236     params.push_back(param_pair_t("from", marker_str.c_str()));
237   }
238   ldout(s->cct, 20) << "sending request to elasticsearch, payload=" << string(in.c_str(), in.length()) << dendl;
239   op_ret = conn->get_resource(resource, &params, nullptr, out, &in);
240   if (op_ret < 0) {
241     ldout(s->cct, 0) << "ERROR: failed to fetch resource (r=" << resource << ", ret=" << op_ret << ")" << dendl;
242     return;
243   }
244
245   ldout(s->cct, 20) << "response: " << string(out.c_str(), out.length()) << dendl;
246
247   JSONParser jparser;
248   if (!jparser.parse(out.c_str(), out.length())) {
249     ldout(s->cct, 0) << "ERROR: failed to parse elasticsearch response" << dendl;
250     op_ret = -EINVAL;
251     return;
252   }
253
254   try {
255     decode_json_obj(response, &jparser);
256   } catch (JSONDecoder::err& e) {
257     ldout(s->cct, 0) << "ERROR: failed to decode JSON input: " << e.message << dendl;
258     op_ret = -EINVAL;
259     return;
260   }
261
262 }
263
264 class RGWMetadataSearch_ObjStore_S3 : public RGWMetadataSearchOp {
265 public:
266   RGWMetadataSearch_ObjStore_S3(const RGWSyncModuleInstanceRef& _sync_module) : RGWMetadataSearchOp(_sync_module) {
267     custom_prefix = "x-amz-meta-";
268   }
269
270   int get_params() override {
271     expression = s->info.args.get("query");
272     bool exists;
273     string max_keys_str = s->info.args.get("max-keys", &exists);
274 #define MAX_KEYS_MAX 10000
275     if (exists) {
276       string err;
277       max_keys = strict_strtoll(max_keys_str.c_str(), 10, &err);
278       if (!err.empty()) {
279         return -EINVAL;
280       }
281       if (max_keys > MAX_KEYS_MAX) {
282         max_keys = MAX_KEYS_MAX;
283       }
284     }
285     marker_str = s->info.args.get("marker", &exists);
286     if (exists) {
287       string err;
288       marker = strict_strtoll(marker_str.c_str(), 10, &err);
289       if (!err.empty()) {
290         return -EINVAL;
291       }
292     }
293     uint64_t nm = marker + max_keys;
294     static constexpr int BUFSIZE = 32;
295     char buf[BUFSIZE];
296     snprintf(buf, sizeof(buf), "%lld", (long long)nm);
297     next_marker = buf;
298     return 0;
299   }
300   void send_response() override {
301     if (op_ret) {
302       s->err.message = err;
303       set_req_state_err(s, op_ret);
304     }
305     dump_errno(s);
306     end_header(s, this, "application/xml");
307
308     if (op_ret < 0) {
309       return;
310     }
311
312     is_truncated = (response.hits.hits.size() >= max_keys);
313
314     s->formatter->open_object_section("SearchMetadataResponse");
315     s->formatter->dump_string("Marker", marker_str);
316     s->formatter->dump_string("IsTruncated", (is_truncated ? "true" : "false"));
317     if (is_truncated) {
318       s->formatter->dump_string("NextMarker", next_marker);
319     }
320     if (s->format == RGW_FORMAT_JSON) {
321       s->formatter->open_array_section("Objects");
322     }
323     for (auto& i : response.hits.hits) {
324       s->formatter->open_object_section("Contents");
325       es_index_obj_response& e = i.source;
326       s->formatter->dump_string("Bucket", e.bucket);
327       s->formatter->dump_string("Key", e.key.name);
328       string instance = (!e.key.instance.empty() ? e.key.instance : "null");
329       s->formatter->dump_string("Instance", instance.c_str());
330       s->formatter->dump_int("VersionedEpoch", e.versioned_epoch);
331       dump_time(s, "LastModified", &e.meta.mtime);
332       s->formatter->dump_int("Size", e.meta.size);
333       s->formatter->dump_format("ETag", "\"%s\"", e.meta.etag.c_str());
334       s->formatter->dump_string("ContentType", e.meta.content_type.c_str());
335       dump_owner(s, e.owner.get_id(), e.owner.get_display_name());
336       s->formatter->open_array_section("CustomMetadata");
337       for (auto& m : e.meta.custom_str) {
338         s->formatter->open_object_section("Entry");
339         s->formatter->dump_string("Name", m.first.c_str());
340         s->formatter->dump_string("Value", m.second);
341         s->formatter->close_section();
342       }
343       for (auto& m : e.meta.custom_int) {
344         s->formatter->open_object_section("Entry");
345         s->formatter->dump_string("Name", m.first.c_str());
346         s->formatter->dump_int("Value", m.second);
347         s->formatter->close_section();
348       }
349       for (auto& m : e.meta.custom_date) {
350         s->formatter->open_object_section("Entry");
351         s->formatter->dump_string("Name", m.first.c_str());
352         s->formatter->dump_string("Value", m.second);
353         s->formatter->close_section();
354       }
355       s->formatter->close_section();
356       rgw_flush_formatter(s, s->formatter);
357       s->formatter->close_section();
358     };
359     if (s->format == RGW_FORMAT_JSON) {
360       s->formatter->close_section();
361     }
362     s->formatter->close_section();
363    rgw_flush_formatter_and_reset(s, s->formatter);
364   }
365 };
366
367 class RGWHandler_REST_MDSearch_S3 : public RGWHandler_REST_S3 {
368 protected:
369   RGWOp *op_get() {
370     if (s->info.args.exists("query")) {
371       return new RGWMetadataSearch_ObjStore_S3(store->get_sync_module());
372     }
373     if (!s->init_state.url_bucket.empty() &&
374         s->info.args.exists("mdsearch")) {
375       return new RGWGetBucketMetaSearch_ObjStore_S3;
376     }
377     return nullptr;
378   }
379   RGWOp *op_head() {
380     return nullptr;
381   }
382   RGWOp *op_post() {
383     return nullptr;
384   }
385 public:
386   RGWHandler_REST_MDSearch_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
387   virtual ~RGWHandler_REST_MDSearch_S3() {}
388 };
389
390
391 RGWHandler_REST* RGWRESTMgr_MDSearch_S3::get_handler(struct req_state* const s,
392                                                      const rgw::auth::StrategyRegistry& auth_registry,
393                                                      const std::string& frontend_prefix)
394 {
395   int ret =
396     RGWHandler_REST_S3::init_from_header(s,
397                                         RGW_FORMAT_XML, true);
398   if (ret < 0) {
399     return nullptr;
400   }
401
402   if (!s->object.empty()) {
403     return nullptr;
404   }
405
406   RGWHandler_REST *handler = new RGWHandler_REST_MDSearch_S3(auth_registry);
407
408   ldout(s->cct, 20) << __func__ << " handler=" << typeid(*handler).name()
409                     << dendl;
410   return handler;
411 }
412