Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_sync_module_es.cc
1 #include "rgw_common.h"
2 #include "rgw_coroutine.h"
3 #include "rgw_sync_module.h"
4 #include "rgw_data_sync.h"
5 #include "rgw_sync_module_es.h"
6 #include "rgw_sync_module_es_rest.h"
7 #include "rgw_rest_conn.h"
8 #include "rgw_cr_rest.h"
9 #include "rgw_op.h"
10 #include "rgw_es_query.h"
11
12 #include "include/str_list.h"
13
14 #include <boost/asio/yield.hpp>
15
16 #define dout_subsys ceph_subsys_rgw
17
18
19 /*
20  * whitelist utility. Config string is a list of entries, where an entry is either an item,
21  * a prefix, or a suffix. An item would be the name of the entity that we'd look up,
22  * a prefix would be a string ending with an asterisk, a suffix would be a string starting
23  * with an asterisk. For example:
24  *
25  *      bucket1, bucket2, foo*, *bar
26  */
27 class ItemList {
28   bool approve_all{false};
29
30   set<string> entries;
31   set<string> prefixes;
32   set<string> suffixes;
33
34   void parse(const string& str) {
35     list<string> l;
36
37     get_str_list(str, ",", l);
38
39     for (auto& entry : l) {
40       entry = rgw_trim_whitespace(entry);
41       if (entry.empty()) {
42         continue;
43       }
44
45       if (entry == "*") {
46         approve_all = true;
47         return;
48       }
49
50       if (entry[0] == '*') {
51         suffixes.insert(entry.substr(1));
52         continue;
53       }
54
55       if (entry.back() == '*') {
56         prefixes.insert(entry.substr(0, entry.size() - 1));
57         continue;
58       }
59
60       entries.insert(entry);
61     }
62   }
63
64 public:
65   ItemList() {}
66   void init(const string& str, bool def_val) {
67     if (str.empty()) {
68       approve_all = def_val;
69     } else {
70       parse(str);
71     }
72   }
73
74   bool exists(const string& entry) {
75     if (approve_all) {
76       return true;
77     }
78
79     if (entries.find(entry) != entries.end()) {
80       return true;
81     }
82
83     auto i = prefixes.upper_bound(entry);
84     if (i != prefixes.begin()) {
85       --i;
86       if (boost::algorithm::starts_with(entry, *i)) {
87         return true;
88       }
89     }
90
91     for (i = suffixes.begin(); i != suffixes.end(); ++i) {
92       if (boost::algorithm::ends_with(entry, *i)) {
93         return true;
94       }
95     }
96
97     return false;
98   }
99 };
100
101 #define ES_NUM_SHARDS_MIN 5
102
103 #define ES_NUM_SHARDS_DEFAULT 16
104 #define ES_NUM_REPLICAS_DEFAULT 1
105
106 struct ElasticConfig {
107   uint64_t sync_instance{0};
108   string id;
109   string index_path;
110   std::unique_ptr<RGWRESTConn> conn;
111   bool explicit_custom_meta{true};
112   string override_index_path;
113   ItemList index_buckets;
114   ItemList allow_owners;
115   uint32_t num_shards{0};
116   uint32_t num_replicas{0};
117
118   void init(CephContext *cct, const map<string, string, ltstr_nocase>& config) {
119     string elastic_endpoint = rgw_conf_get(config, "endpoint", "");
120     id = string("elastic:") + elastic_endpoint;
121     conn.reset(new RGWRESTConn(cct, nullptr, id, { elastic_endpoint }));
122     explicit_custom_meta = rgw_conf_get_bool(config, "explicit_custom_meta", true);
123     index_buckets.init(rgw_conf_get(config, "index_buckets_list", ""), true); /* approve all buckets by default */
124     allow_owners.init(rgw_conf_get(config, "approved_owners_list", ""), true); /* approve all bucket owners by default */
125     override_index_path = rgw_conf_get(config, "override_index_path", "");
126     num_shards = rgw_conf_get_int(config, "num_shards", ES_NUM_SHARDS_DEFAULT);
127     if (num_shards < ES_NUM_SHARDS_MIN) {
128       num_shards = ES_NUM_SHARDS_MIN;
129     }
130     num_replicas = rgw_conf_get_int(config, "num_replicas", ES_NUM_REPLICAS_DEFAULT);
131   }
132
133   void init_instance(RGWRealm& realm, uint64_t instance_id) {
134     sync_instance = instance_id;
135
136     if (!override_index_path.empty()) {
137       index_path = override_index_path;
138       return;
139     }
140
141     char buf[32];
142     snprintf(buf, sizeof(buf), "-%08x", (uint32_t)(sync_instance & 0xFFFFFFFF));
143
144     index_path = "/rgw-" + realm.get_name() + buf;
145   }
146
147   string get_index_path() {
148     return index_path;
149   }
150
151   string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
152     return index_path +  "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance);
153   }
154
155   bool should_handle_operation(RGWBucketInfo& bucket_info) {
156     return index_buckets.exists(bucket_info.bucket.name) &&
157            allow_owners.exists(bucket_info.owner.to_str());
158   }
159 };
160
161 using ElasticConfigRef = std::shared_ptr<ElasticConfig>;
162
163 struct es_dump_type {
164   const char *type;
165   const char *format;
166   bool analyzed;
167
168   es_dump_type(const char *t, const char *f = nullptr, bool a = false) : type(t), format(f), analyzed(a) {}
169
170   void dump(Formatter *f) const {
171     encode_json("type", type, f);
172     if (format) {
173       encode_json("format", format, f);
174     }
175     if (!analyzed && strcmp(type, "string") == 0) {
176       encode_json("index", "not_analyzed", f);
177     }
178   }
179 };
180
181 struct es_index_mappings {
182   void dump_custom(Formatter *f, const char *section, const char *type, const char *format) const {
183     f->open_object_section(section);
184     ::encode_json("type", "nested", f);
185     f->open_object_section("properties");
186     encode_json("name", es_dump_type("string"), f);
187     encode_json("value", es_dump_type(type, format), f);
188     f->close_section(); // entry
189     f->close_section(); // custom-string
190   }
191   void dump(Formatter *f) const {
192     f->open_object_section("object");
193     f->open_object_section("properties");
194     encode_json("bucket", es_dump_type("string"), f);
195     encode_json("name", es_dump_type("string"), f);
196     encode_json("instance", es_dump_type("string"), f);
197     encode_json("versioned_epoch", es_dump_type("long"), f);
198     f->open_object_section("meta");
199     f->open_object_section("properties");
200     encode_json("cache_control", es_dump_type("string"), f);
201     encode_json("content_disposition", es_dump_type("string"), f);
202     encode_json("content_encoding", es_dump_type("string"), f);
203     encode_json("content_language", es_dump_type("string"), f);
204     encode_json("content_type", es_dump_type("string"), f);
205     encode_json("etag", es_dump_type("string"), f);
206     encode_json("expires", es_dump_type("string"), f);
207     f->open_object_section("mtime");
208     ::encode_json("type", "date", f);
209     ::encode_json("format", "strict_date_optional_time||epoch_millis", f);
210     f->close_section(); // mtime
211     encode_json("size", es_dump_type("long"), f);
212     dump_custom(f, "custom-string", "string", nullptr);
213     dump_custom(f, "custom-int", "long", nullptr);
214     dump_custom(f, "custom-date", "date", "strict_date_optional_time||epoch_millis");
215     f->close_section(); // properties
216     f->close_section(); // meta
217     f->close_section(); // properties
218     f->close_section(); // object
219   }
220 };
221
222 struct es_index_settings {
223   uint32_t num_replicas;
224   uint32_t num_shards;
225
226   es_index_settings(uint32_t _replicas, uint32_t _shards) : num_replicas(_replicas), num_shards(_shards) {}
227
228   void dump(Formatter *f) const {
229     encode_json("number_of_replicas", num_replicas, f);
230     encode_json("number_of_shards", num_shards, f);
231   }
232 };
233
234 struct es_index_config {
235   es_index_settings settings;
236   es_index_mappings mappings;
237
238   es_index_config(es_index_settings& _s, es_index_mappings& _m) : settings(_s), mappings(_m) {}
239
240   void dump(Formatter *f) const {
241     encode_json("settings", settings, f);
242     encode_json("mappings", mappings, f);
243   }
244 };
245
246 struct es_obj_metadata {
247   CephContext *cct;
248   ElasticConfigRef es_conf;
249   RGWBucketInfo bucket_info;
250   rgw_obj_key key;
251   ceph::real_time mtime;
252   uint64_t size;
253   map<string, bufferlist> attrs;
254   uint64_t versioned_epoch;
255
256   es_obj_metadata(CephContext *_cct, ElasticConfigRef _es_conf, const RGWBucketInfo& _bucket_info,
257                   const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size,
258                   map<string, bufferlist>& _attrs, uint64_t _versioned_epoch) : cct(_cct), es_conf(_es_conf), bucket_info(_bucket_info), key(_key),
259                                                      mtime(_mtime), size(_size), attrs(std::move(_attrs)), versioned_epoch(_versioned_epoch) {}
260
261   void dump(Formatter *f) const {
262     map<string, string> out_attrs;
263     map<string, string> custom_meta;
264     RGWAccessControlPolicy policy;
265     set<string> permissions;
266     RGWObjTags obj_tags;
267
268     for (auto i : attrs) {
269       const string& attr_name = i.first;
270       string name;
271       bufferlist& val = i.second;
272
273       if (attr_name.compare(0, sizeof(RGW_ATTR_PREFIX) - 1, RGW_ATTR_PREFIX) != 0) {
274         continue;
275       }
276
277       if (attr_name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
278         name = attr_name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1);
279         custom_meta[name] = string(val.c_str(), (val.length() > 0 ? val.length() - 1 : 0));
280         continue;
281       }
282
283       name = attr_name.substr(sizeof(RGW_ATTR_PREFIX) - 1);
284
285       if (name == "acl") {
286         try {
287           auto i = val.begin();
288           ::decode(policy, i);
289         } catch (buffer::error& err) {
290           ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl;
291         }
292
293         const RGWAccessControlList& acl = policy.get_acl();
294
295         permissions.insert(policy.get_owner().get_id().to_str());
296         for (auto acliter : acl.get_grant_map()) {
297           const ACLGrant& grant = acliter.second;
298           if (grant.get_type().get_type() == ACL_TYPE_CANON_USER &&
299               ((uint32_t)grant.get_permission().get_permissions() & RGW_PERM_READ) != 0) {
300             rgw_user user;
301             if (grant.get_id(user)) {
302               permissions.insert(user.to_str());
303             }
304           }
305         }
306       } else if (name == "x-amz-tagging") {
307         auto tags_bl = val.begin();
308         ::decode(obj_tags, tags_bl);
309       } else {
310         if (name != "pg_ver" &&
311             name != "source_zone" &&
312             name != "idtag") {
313           out_attrs[name] = string(val.c_str(), (val.length() > 0 ? val.length() - 1 : 0));
314         }
315       }
316     }
317     ::encode_json("bucket", bucket_info.bucket.name, f);
318     ::encode_json("name", key.name, f);
319     ::encode_json("instance", key.instance, f);
320     ::encode_json("versioned_epoch", versioned_epoch, f);
321     ::encode_json("owner", policy.get_owner(), f);
322     ::encode_json("permissions", permissions, f);
323     f->open_object_section("meta");
324     ::encode_json("size", size, f);
325
326     string mtime_str;
327     rgw_to_iso8601(mtime, &mtime_str);
328     ::encode_json("mtime", mtime_str, f);
329     for (auto i : out_attrs) {
330       ::encode_json(i.first.c_str(), i.second, f);
331     }
332     map<string, string> custom_str;
333     map<string, string> custom_int;
334     map<string, string> custom_date;
335
336     for (auto i : custom_meta) {
337       auto config = bucket_info.mdsearch_config.find(i.first);
338       if (config == bucket_info.mdsearch_config.end()) {
339         if (!es_conf->explicit_custom_meta) {
340           /* default custom meta is of type string */
341           custom_str[i.first] = i.second;
342         } else {
343           ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl;
344         }
345         continue;
346       }
347       switch (config->second) {
348         case ESEntityTypeMap::ES_ENTITY_DATE:
349           custom_date[i.first] = i.second;
350           break;
351         case ESEntityTypeMap::ES_ENTITY_INT:
352           custom_int[i.first] = i.second;
353           break;
354         default:
355           custom_str[i.first] = i.second;
356       }
357     }
358
359     if (!custom_str.empty()) {
360       f->open_array_section("custom-string");
361       for (auto i : custom_str) {
362         f->open_object_section("entity");
363         ::encode_json("name", i.first.c_str(), f);
364         ::encode_json("value", i.second, f);
365         f->close_section();
366       }
367       f->close_section();
368     }
369     if (!custom_int.empty()) {
370       f->open_array_section("custom-int");
371       for (auto i : custom_int) {
372         f->open_object_section("entity");
373         ::encode_json("name", i.first.c_str(), f);
374         ::encode_json("value", i.second, f);
375         f->close_section();
376       }
377       f->close_section();
378     }
379     if (!custom_date.empty()) {
380       f->open_array_section("custom-date");
381       for (auto i : custom_date) {
382         /*
383          * try to exlicitly parse date field, otherwise elasticsearch could reject the whole doc,
384          * which will end up with failed sync
385          */
386         real_time t;
387         int r = parse_time(i.second.c_str(), &t);
388         if (r < 0) {
389           ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl;
390           continue;
391         }
392
393         string time_str;
394         rgw_to_iso8601(t, &time_str);
395
396         f->open_object_section("entity");
397         ::encode_json("name", i.first.c_str(), f);
398         ::encode_json("value", time_str.c_str(), f);
399         f->close_section();
400       }
401       f->close_section();
402     }
403     f->close_section(); // meta
404     const auto& m = obj_tags.get_tags();
405     if (m.size() > 0){
406       f->open_array_section("tagging");
407       for (const auto &it : m) {
408         f->open_object_section("tag");
409         ::encode_json("key", it.first, f);
410         ::encode_json("value",it.second, f);
411         f->close_section();
412       }
413       f->close_section(); // tagging
414     }
415   }
416 };
417
418 class RGWElasticInitConfigCBCR : public RGWCoroutine {
419   RGWDataSyncEnv *sync_env;
420   ElasticConfigRef conf;
421 public:
422   RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
423                           ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
424                                                     sync_env(_sync_env),
425                                                     conf(_conf) {}
426   int operate() override {
427     reenter(this) {
428       ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
429       yield {
430         string path = conf->get_index_path();
431
432         es_index_settings settings(conf->num_replicas, conf->num_shards);
433         es_index_mappings mappings;
434
435         es_index_config index_conf(settings, mappings);
436
437         call(new RGWPutRESTResourceCR<es_index_config, int>(sync_env->cct, conf->conn.get(),
438                                                               sync_env->http_manager,
439                                                               path, nullptr /* params */,
440                                                               index_conf, nullptr /* result */));
441       }
442       if (retcode < 0) {
443         return set_cr_error(retcode);
444       }
445       return set_cr_done();
446     }
447     return 0;
448   }
449
450 };
451
452 class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
453   ElasticConfigRef conf;
454   uint64_t versioned_epoch;
455 public:
456   RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
457                           RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
458                           ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
459                                                                                versioned_epoch(_versioned_epoch) {}
460   int operate() override {
461     reenter(this) {
462       ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
463                                << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
464                                << " attrs=" << attrs << dendl;
465       yield {
466         string path = conf->get_obj_path(bucket_info, key);
467         es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
468
469         call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
470                                                             sync_env->http_manager,
471                                                             path, nullptr /* params */,
472                                                             doc, nullptr /* result */));
473
474       }
475       if (retcode < 0) {
476         return set_cr_error(retcode);
477       }
478       return set_cr_done();
479     }
480     return 0;
481   }
482 };
483
484 class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
485   ElasticConfigRef conf;
486   uint64_t versioned_epoch;
487 public:
488   RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
489                         RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
490                         ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
491                                                            conf(_conf), versioned_epoch(_versioned_epoch) {
492   }
493
494   ~RGWElasticHandleRemoteObjCR() override {}
495
496   RGWStatRemoteObjCBCR *allocate_callback() override {
497     return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
498   }
499 };
500
501 class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
502   RGWDataSyncEnv *sync_env;
503   RGWBucketInfo bucket_info;
504   rgw_obj_key key;
505   ceph::real_time mtime;
506   ElasticConfigRef conf;
507 public:
508   RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
509                           RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
510                           ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
511                                                         bucket_info(_bucket_info), key(_key),
512                                                         mtime(_mtime), conf(_conf) {}
513   int operate() override {
514     reenter(this) {
515       ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
516                                << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
517       yield {
518         string path = conf->get_obj_path(bucket_info, key);
519
520         call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
521                                          sync_env->http_manager,
522                                          path, nullptr /* params */));
523       }
524       if (retcode < 0) {
525         return set_cr_error(retcode);
526       }
527       return set_cr_done();
528     }
529     return 0;
530   }
531
532 };
533
534 class RGWElasticDataSyncModule : public RGWDataSyncModule {
535   ElasticConfigRef conf;
536 public:
537   RGWElasticDataSyncModule(CephContext *cct, const map<string, string, ltstr_nocase>& config) : conf(std::make_shared<ElasticConfig>()) {
538     conf->init(cct, config);
539   }
540   ~RGWElasticDataSyncModule() override {}
541
542   void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
543     conf->init_instance(sync_env->store->get_realm(), instance_id);
544   }
545
546   RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
547     ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
548     return new RGWElasticInitConfigCBCR(sync_env, conf);
549   }
550   RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
551     ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
552     if (!conf->should_handle_operation(bucket_info)) {
553       ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
554       return nullptr;
555     }
556     return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch);
557   }
558   RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
559     /* versioned and versioned epoch params are useless in the elasticsearch backend case */
560     ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
561     if (!conf->should_handle_operation(bucket_info)) {
562       ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
563       return nullptr;
564     }
565     return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
566   }
567   RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
568                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
569     ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
570                             << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
571     ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
572     return NULL;
573   }
574   RGWRESTConn *get_rest_conn() {
575     return conf->conn.get();
576   }
577
578   string get_index_path() {
579     return conf->get_index_path();
580   }
581 };
582
583 RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const map<string, string, ltstr_nocase>& config)
584 {
585   data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config));
586 }
587
588 RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler()
589 {
590   return data_handler.get();
591 }
592
593 RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn()
594 {
595   return data_handler->get_rest_conn();
596 }
597
598 string RGWElasticSyncModuleInstance::get_index_path() {
599   return data_handler->get_index_path();
600 }
601
602 RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
603   if (dialect != RGW_REST_S3) {
604     return orig;
605   }
606   delete orig;
607   return new RGWRESTMgr_MDSearch_S3();
608 }
609
610 int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance) {
611   string endpoint;
612   auto i = config.find("endpoint");
613   if (i != config.end()) {
614     endpoint = i->second;
615   }
616   instance->reset(new RGWElasticSyncModuleInstance(cct, config));
617   return 0;
618 }
619