Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_bucket.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5
6 #include <string>
7 #include <map>
8 #include <sstream>
9
10 #include <boost/utility/string_ref.hpp>
11 #include <boost/format.hpp>
12
13 #include "common/errno.h"
14 #include "common/ceph_json.h"
15 #include "common/backport14.h"
16 #include "rgw_rados.h"
17 #include "rgw_acl.h"
18 #include "rgw_acl_s3.h"
19
20 #include "include/types.h"
21 #include "rgw_bucket.h"
22 #include "rgw_user.h"
23 #include "rgw_string.h"
24 #include "rgw_multi.h"
25
26 #include "include/rados/librados.hpp"
27 // until everything is moved from rgw_common
28 #include "rgw_common.h"
29
30 #include "cls/user/cls_user_types.h"
31
32 #define dout_context g_ceph_context
33 #define dout_subsys ceph_subsys_rgw
34
35 #define BUCKET_TAG_TIMEOUT 30
36
37 using namespace std;
38
39 static RGWMetadataHandler *bucket_meta_handler = NULL;
40 static RGWMetadataHandler *bucket_instance_meta_handler = NULL;
41
42 // define as static when RGWBucket implementation compete
43 void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id)
44 {
45   buckets_obj_id = user_id.to_str();
46   buckets_obj_id += RGW_BUCKETS_OBJ_SUFFIX;
47 }
48
49 /*
50  * Note that this is not a reversal of parse_bucket(). That one deals
51  * with the syntax we need in metadata and such. This one deals with
52  * the representation in RADOS pools. We chose '/' because it's not
53  * acceptable in bucket names and thus qualified buckets cannot conflict
54  * with the legacy or S3 buckets.
55  */
56 std::string rgw_make_bucket_entry_name(const std::string& tenant_name,
57                                        const std::string& bucket_name) {
58   std::string bucket_entry;
59
60   if (bucket_name.empty()) {
61     bucket_entry.clear();
62   } else if (tenant_name.empty()) {
63     bucket_entry = bucket_name;
64   } else {
65     bucket_entry = tenant_name + "/" + bucket_name;
66   }
67
68   return bucket_entry;
69 }
70
71 /*
72  * Tenants are separated from buckets in URLs by a colon in S3.
73  * This function is not to be used on Swift URLs, not even for COPY arguments.
74  */
75 void rgw_parse_url_bucket(const string &bucket, const string& auth_tenant,
76                           string &tenant_name, string &bucket_name) {
77
78   int pos = bucket.find(':');
79   if (pos >= 0) {
80     /*
81      * N.B.: We allow ":bucket" syntax with explicit empty tenant in order
82      * to refer to the legacy tenant, in case users in new named tenants
83      * want to access old global buckets.
84      */
85     tenant_name = bucket.substr(0, pos);
86     bucket_name = bucket.substr(pos + 1);
87   } else {
88     tenant_name = auth_tenant;
89     bucket_name = bucket;
90   }
91 }
92
93 /**
94  * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
95  * Returns: 0 on success, -ERR# on failure.
96  */
97 int rgw_read_user_buckets(RGWRados * store,
98                           const rgw_user& user_id,
99                           RGWUserBuckets& buckets,
100                           const string& marker,
101                           const string& end_marker,
102                           uint64_t max,
103                           bool need_stats,
104                           bool *is_truncated,
105                           uint64_t default_amount)
106 {
107   int ret;
108   buckets.clear();
109   std::string buckets_obj_id;
110   rgw_get_buckets_obj(user_id, buckets_obj_id);
111   rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
112
113   bool truncated = false;
114   string m = marker;
115
116   uint64_t total = 0;
117
118   if (!max) {
119     max = default_amount;
120   }
121
122   do {
123     std::list<cls_user_bucket_entry> entries;
124     ret = store->cls_user_list_buckets(obj, m, end_marker, max - total, entries, &m, &truncated);
125     if (ret == -ENOENT) {
126       ret = 0;
127     }
128
129     if (ret < 0) {
130       return ret;
131     }
132
133     for (auto& entry : entries) {
134       buckets.add(RGWBucketEnt(user_id, std::move(entry)));
135       total++;
136     }
137
138   } while (truncated && total < max);
139
140   if (is_truncated != nullptr) {
141     *is_truncated = truncated;
142   }
143
144   if (need_stats) {
145     map<string, RGWBucketEnt>& m = buckets.get_buckets();
146     ret = store->update_containers_stats(m);
147     if (ret < 0 && ret != -ENOENT) {
148       ldout(store->ctx(), 0) << "ERROR: could not get stats for buckets" << dendl;
149       return ret;
150     }
151   }
152   return 0;
153 }
154
155 int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info)
156 {
157   string buckets_obj_id;
158   rgw_get_buckets_obj(user_id, buckets_obj_id);
159   rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
160
161   return store->cls_user_sync_bucket_stats(obj, bucket_info);
162 }
163
164 int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name)
165 {
166   RGWBucketInfo bucket_info;
167   RGWObjectCtx obj_ctx(store);
168   int ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL);
169   if (ret < 0) {
170     ldout(store->ctx(), 0) << "ERROR: could not fetch bucket info: ret=" << ret << dendl;
171     return ret;
172   }
173
174   ret = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info);
175   if (ret < 0) {
176     ldout(store->ctx(), 0) << "ERROR: could not sync user stats for bucket " << bucket_name << ": ret=" << ret << dendl;
177     return ret;
178   }
179
180   return 0;
181 }
182
183 int rgw_link_bucket(RGWRados* const store,
184                     const rgw_user& user_id,
185                     rgw_bucket& bucket,
186                     ceph::real_time creation_time,
187                     bool update_entrypoint)
188 {
189   int ret;
190   string& tenant_name = bucket.tenant;
191   string& bucket_name = bucket.name;
192
193   cls_user_bucket_entry new_bucket;
194
195   RGWBucketEntryPoint ep;
196   RGWObjVersionTracker ot;
197
198   bucket.convert(&new_bucket.bucket);
199   new_bucket.size = 0;
200   if (real_clock::is_zero(creation_time))
201     new_bucket.creation_time = real_clock::now();
202   else
203     new_bucket.creation_time = creation_time;
204
205   map<string, bufferlist> attrs;
206   RGWObjectCtx obj_ctx(store);
207
208   if (update_entrypoint) {
209     ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
210     if (ret < 0 && ret != -ENOENT) {
211       ldout(store->ctx(), 0) << "ERROR: store->get_bucket_entrypoint_info() returned: "
212                              << cpp_strerror(-ret) << dendl;
213     }
214   }
215
216   string buckets_obj_id;
217   rgw_get_buckets_obj(user_id, buckets_obj_id);
218
219   rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
220   ret = store->cls_user_add_bucket(obj, new_bucket);
221   if (ret < 0) {
222     ldout(store->ctx(), 0) << "ERROR: error adding bucket to directory: "
223                            << cpp_strerror(-ret) << dendl;
224     goto done_err;
225   }
226
227   if (!update_entrypoint)
228     return 0;
229
230   ep.linked = true;
231   ep.owner = user_id;
232   ep.bucket = bucket;
233   ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
234   if (ret < 0)
235     goto done_err;
236
237   return 0;
238 done_err:
239   int r = rgw_unlink_bucket(store, user_id, bucket.tenant, bucket.name);
240   if (r < 0) {
241     ldout(store->ctx(), 0) << "ERROR: failed unlinking bucket on error cleanup: "
242                            << cpp_strerror(-r) << dendl;
243   }
244   return ret;
245 }
246
247 int rgw_unlink_bucket(RGWRados *store, const rgw_user& user_id, const string& tenant_name, const string& bucket_name, bool update_entrypoint)
248 {
249   int ret;
250
251   string buckets_obj_id;
252   rgw_get_buckets_obj(user_id, buckets_obj_id);
253
254   cls_user_bucket bucket;
255   bucket.name = bucket_name;
256   rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
257   ret = store->cls_user_remove_bucket(obj, bucket);
258   if (ret < 0) {
259     ldout(store->ctx(), 0) << "ERROR: error removing bucket from directory: "
260         << cpp_strerror(-ret)<< dendl;
261   }
262
263   if (!update_entrypoint)
264     return 0;
265
266   RGWBucketEntryPoint ep;
267   RGWObjVersionTracker ot;
268   map<string, bufferlist> attrs;
269   RGWObjectCtx obj_ctx(store);
270   ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
271   if (ret == -ENOENT)
272     return 0;
273   if (ret < 0)
274     return ret;
275
276   if (!ep.linked)
277     return 0;
278
279   if (ep.owner != user_id) {
280     ldout(store->ctx(), 0) << "bucket entry point user mismatch, can't unlink bucket: " << ep.owner << " != " << user_id << dendl;
281     return -EINVAL;
282   }
283
284   ep.linked = false;
285   return store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
286 }
287
288 int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive,
289                           map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
290                           real_time mtime) {
291   return store->meta_mgr->put_entry(bucket_meta_handler, bucket_name, bl, exclusive, objv_tracker, mtime, pattrs);
292 }
293
294 int rgw_bucket_instance_store_info(RGWRados *store, string& entry, bufferlist& bl, bool exclusive,
295                           map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
296                           real_time mtime) {
297   return store->meta_mgr->put_entry(bucket_instance_meta_handler, entry, bl, exclusive, objv_tracker, mtime, pattrs);
298 }
299
300 int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker) {
301   return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
302 }
303
304 // 'tenant/' is used in bucket instance keys for sync to avoid parsing ambiguity
305 // with the existing instance[:shard] format. once we parse the shard, the / is
306 // replaced with a : to match the [tenant:]instance format
307 void rgw_bucket_instance_key_to_oid(string& key)
308 {
309   // replace tenant/ with tenant:
310   auto c = key.find('/');
311   if (c != string::npos) {
312     key[c] = ':';
313   }
314 }
315
316 // convert bucket instance oids back to the tenant/ format for metadata keys.
317 // it's safe to parse 'tenant:' only for oids, because they won't contain the
318 // optional :shard at the end
319 void rgw_bucket_instance_oid_to_key(string& oid)
320 {
321   // find first : (could be tenant:bucket or bucket:instance)
322   auto c = oid.find(':');
323   if (c != string::npos) {
324     // if we find another :, the first one was for tenant
325     if (oid.find(':', c + 1) != string::npos) {
326       oid[c] = '/';
327     }
328   }
329 }
330
331 int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id)
332 {
333   ssize_t pos = bucket_instance.rfind(':');
334   if (pos < 0) {
335     return -EINVAL;
336   }
337
338   string first = bucket_instance.substr(0, pos);
339   string second = bucket_instance.substr(pos + 1);
340
341   if (first.find(':') == string::npos) {
342     *shard_id = -1;
343     *target_bucket_instance = bucket_instance;
344     return 0;
345   }
346
347   *target_bucket_instance = first;
348   string err;
349   *shard_id = strict_strtol(second.c_str(), 10, &err);
350   if (!err.empty()) {
351     return -EINVAL;
352   }
353
354   return 0;
355 }
356
357 // parse key in format: [tenant/]name:instance[:shard_id]
358 int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key,
359                                 rgw_bucket *bucket, int *shard_id)
360 {
361   boost::string_ref name{key};
362   boost::string_ref instance;
363
364   // split tenant/name
365   auto pos = name.find('/');
366   if (pos != boost::string_ref::npos) {
367     auto tenant = name.substr(0, pos);
368     bucket->tenant.assign(tenant.begin(), tenant.end());
369     name = name.substr(pos + 1);
370   }
371
372   // split name:instance
373   pos = name.find(':');
374   if (pos != boost::string_ref::npos) {
375     instance = name.substr(pos + 1);
376     name = name.substr(0, pos);
377   }
378   bucket->name.assign(name.begin(), name.end());
379
380   // split instance:shard
381   pos = instance.find(':');
382   if (pos == boost::string_ref::npos) {
383     bucket->bucket_id.assign(instance.begin(), instance.end());
384     *shard_id = -1;
385     return 0;
386   }
387
388   // parse shard id
389   auto shard = instance.substr(pos + 1);
390   string err;
391   auto id = strict_strtol(shard.data(), 10, &err);
392   if (!err.empty()) {
393     ldout(cct, 0) << "ERROR: failed to parse bucket shard '"
394         << instance.data() << "': " << err << dendl;
395     return -EINVAL;
396   }
397
398   *shard_id = id;
399   instance = instance.substr(0, pos);
400   bucket->bucket_id.assign(instance.begin(), instance.end());
401   return 0;
402 }
403
404 int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info,
405                          map<string, bufferlist>& attrs,
406                          RGWObjVersionTracker *objv_tracker)
407 {
408   rgw_bucket& bucket = bucket_info.bucket;
409
410   if (!bucket_info.has_instance_obj) {
411     /* an old bucket object, need to convert it */
412     RGWObjectCtx obj_ctx(store);
413     int ret = store->convert_old_bucket_info(obj_ctx, bucket.tenant, bucket.name);
414     if (ret < 0) {
415       ldout(store->ctx(), 0) << "ERROR: failed converting old bucket info: " << ret << dendl;
416       return ret;
417     }
418   }
419
420   /* we want the bucket instance name without the oid prefix cruft */
421   string key = bucket.get_key();
422   bufferlist bl;
423
424   ::encode(bucket_info, bl);
425
426   return rgw_bucket_instance_store_info(store, key, bl, false, &attrs, objv_tracker, real_time());
427 }
428
429 static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
430         Formatter *f)
431 {
432   for (const auto& o : objs_to_unlink) {
433     f->dump_string("object",  o.name);
434   }
435 }
436
437 void check_bad_user_bucket_mapping(RGWRados *store, const rgw_user& user_id,
438                                    bool fix)
439 {
440   RGWUserBuckets user_buckets;
441   bool is_truncated = false;
442   string marker;
443
444   CephContext *cct = store->ctx();
445
446   size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
447
448   do {
449     int ret = rgw_read_user_buckets(store, user_id, user_buckets, marker,
450                                     string(), max_entries, false,
451                                     &is_truncated);
452     if (ret < 0) {
453       ldout(store->ctx(), 0) << "failed to read user buckets: "
454                              << cpp_strerror(-ret) << dendl;
455       return;
456     }
457
458     map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
459     for (map<string, RGWBucketEnt>::iterator i = buckets.begin();
460          i != buckets.end();
461          ++i) {
462       marker = i->first;
463
464       RGWBucketEnt& bucket_ent = i->second;
465       rgw_bucket& bucket = bucket_ent.bucket;
466
467       RGWBucketInfo bucket_info;
468       real_time mtime;
469       RGWObjectCtx obj_ctx(store);
470       int r = store->get_bucket_info(obj_ctx, user_id.tenant, bucket.name, bucket_info, &mtime);
471       if (r < 0) {
472         ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << dendl;
473         continue;
474       }
475
476       rgw_bucket& actual_bucket = bucket_info.bucket;
477
478       if (actual_bucket.name.compare(bucket.name) != 0 ||
479           actual_bucket.tenant.compare(bucket.tenant) != 0 ||
480           actual_bucket.marker.compare(bucket.marker) != 0 ||
481           actual_bucket.bucket_id.compare(bucket.bucket_id) != 0) {
482         cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl;
483         if (fix) {
484           cout << "fixing" << std::endl;
485           r = rgw_link_bucket(store, user_id, actual_bucket,
486                               bucket_info.creation_time);
487           if (r < 0) {
488             cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl;
489           }
490         }
491       }
492     }
493   } while (is_truncated);
494 }
495
496 static bool bucket_object_check_filter(const string& oid)
497 {
498   rgw_obj_key key;
499   string ns;
500   return rgw_obj_key::oid_to_key_in_ns(oid, &key, ns);
501 }
502
503 int rgw_remove_object(RGWRados *store, RGWBucketInfo& bucket_info, rgw_bucket& bucket, rgw_obj_key& key)
504 {
505   RGWObjectCtx rctx(store);
506
507   if (key.instance.empty()) {
508     key.instance = "null";
509   }
510
511   rgw_obj obj(bucket, key);
512
513   return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
514 }
515
516 int rgw_remove_bucket(RGWRados *store, rgw_bucket& bucket, bool delete_children)
517 {
518   int ret;
519   map<RGWObjCategory, RGWStorageStats> stats;
520   std::vector<rgw_bucket_dir_entry> objs;
521   map<string, bool> common_prefixes;
522   RGWBucketInfo info;
523   RGWObjectCtx obj_ctx(store);
524
525   string bucket_ver, master_ver;
526
527   ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL);
528   if (ret < 0)
529     return ret;
530
531   ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
532   if (ret < 0)
533     return ret;
534
535   RGWRados::Bucket target(store, info);
536   RGWRados::Bucket::List list_op(&target);
537   CephContext *cct = store->ctx();
538   int max = 1000;
539
540   list_op.params.list_versions = true;
541
542   do {
543     objs.clear();
544
545     ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
546     if (ret < 0)
547       return ret;
548
549     if (!objs.empty() && !delete_children) {
550       lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << bucket.name << dendl;
551       return -ENOTEMPTY;
552     }
553
554     for (const auto& obj : objs) {
555       rgw_obj_key key(obj.key);
556       ret = rgw_remove_object(store, info, bucket, key);
557       if (ret < 0)
558         return ret;
559     }
560
561   } while (!objs.empty());
562
563   string prefix, delimiter;
564
565   ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
566   if (ret < 0) {
567     return ret;
568   }
569
570   ret = rgw_bucket_sync_user_stats(store, bucket.tenant, info);
571   if ( ret < 0) {
572      dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" <<  ret << dendl;
573   }
574
575   RGWObjVersionTracker objv_tracker;
576
577   ret = store->delete_bucket(info, objv_tracker);
578   if (ret < 0) {
579     lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << dendl;
580     return ret;
581   }
582
583   ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
584   if (ret < 0) {
585     lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
586   }
587
588   return ret;
589 }
590
591 static int aio_wait(librados::AioCompletion *handle)
592 {
593   librados::AioCompletion *c = (librados::AioCompletion *)handle;
594   c->wait_for_safe();
595   int ret = c->get_return_value();
596   c->release();
597   return ret;
598 }
599
600 static int drain_handles(list<librados::AioCompletion *>& pending)
601 {
602   int ret = 0;
603   while (!pending.empty()) {
604     librados::AioCompletion *handle = pending.front();
605     pending.pop_front();
606     int r = aio_wait(handle);
607     if (r < 0) {
608       ret = r;
609     }
610   }
611   return ret;
612 }
613
614 int rgw_remove_bucket_bypass_gc(RGWRados *store, rgw_bucket& bucket,
615                                 int concurrent_max, bool keep_index_consistent)
616 {
617   int ret;
618   map<RGWObjCategory, RGWStorageStats> stats;
619   std::vector<rgw_bucket_dir_entry> objs;
620   map<string, bool> common_prefixes;
621   RGWBucketInfo info;
622   RGWObjectCtx obj_ctx(store);
623   CephContext *cct = store->ctx();
624
625   string bucket_ver, master_ver;
626
627   ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL);
628   if (ret < 0)
629     return ret;
630
631   ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
632   if (ret < 0)
633     return ret;
634
635   string prefix, delimiter;
636
637   ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
638   if (ret < 0) {
639     return ret;
640   }
641
642   RGWRados::Bucket target(store, info);
643   RGWRados::Bucket::List list_op(&target);
644
645   list_op.params.list_versions = true;
646
647   std::list<librados::AioCompletion*> handles;
648
649   int max = 1000;
650   int max_aio = concurrent_max;
651   ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
652   if (ret < 0)
653     return ret;
654
655   while (!objs.empty()) {
656     std::vector<rgw_bucket_dir_entry>::iterator it = objs.begin();
657     for (; it != objs.end(); ++it) {
658       RGWObjState *astate = NULL;
659       rgw_obj obj(bucket, (*it).key);
660
661       ret = store->get_obj_state(&obj_ctx, info, obj, &astate, false);
662       if (ret == -ENOENT) {
663         dout(1) << "WARNING: cannot find obj state for obj " << obj.get_oid() << dendl;
664         continue;
665       }
666       if (ret < 0) {
667         lderr(store->ctx()) << "ERROR: get obj state returned with error " << ret << dendl;
668         return ret;
669       }
670
671       if (astate->has_manifest) {
672         RGWObjManifest& manifest = astate->manifest;
673         RGWObjManifest::obj_iterator miter = manifest.obj_begin();
674         rgw_obj head_obj = manifest.get_obj();
675         rgw_raw_obj raw_head_obj;
676         store->obj_to_raw(info.placement_rule, head_obj, &raw_head_obj);
677
678
679         for (; miter != manifest.obj_end() && max_aio--; ++miter) {
680           if (!max_aio) {
681             ret = drain_handles(handles);
682             if (ret < 0) {
683               lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
684               return ret;
685             }
686             max_aio = concurrent_max;
687           }
688
689           rgw_raw_obj last_obj = miter.get_location().get_raw_obj(store);
690           if (last_obj == raw_head_obj) {
691             // have the head obj deleted at the end
692             continue;
693           }
694
695           ret = store->delete_raw_obj_aio(last_obj, handles);
696           if (ret < 0) {
697             lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
698             return ret;
699           }
700         } // for all shadow objs
701
702         ret = store->delete_obj_aio(head_obj, info, astate, handles, keep_index_consistent);
703         if (ret < 0) {
704           lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
705           return ret;
706         }
707       }
708
709       if (!max_aio) {
710         ret = drain_handles(handles);
711         if (ret < 0) {
712           lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
713           return ret;
714         }
715         max_aio = concurrent_max;
716       }
717     } // for all RGW objects
718     objs.clear();
719
720     ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
721     if (ret < 0)
722       return ret;
723   }
724
725   ret = drain_handles(handles);
726   if (ret < 0) {
727     lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
728     return ret;
729   }
730
731   ret = rgw_bucket_sync_user_stats(store, bucket.tenant, info);
732   if (ret < 0) {
733      dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" <<  ret << dendl;
734   }
735
736   RGWObjVersionTracker objv_tracker;
737
738   ret = rgw_bucket_delete_bucket_obj(store, bucket.tenant, bucket.name, objv_tracker);
739   if (ret < 0) {
740     lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << "with ret as " << ret << dendl;
741     return ret;
742   }
743
744   if (!store->is_syncing_bucket_meta(bucket)) {
745     RGWObjVersionTracker objv_tracker;
746     string entry = bucket.get_key();
747     ret = rgw_bucket_instance_remove_entry(store, entry, &objv_tracker);
748     if (ret < 0) {
749       lderr(store->ctx()) << "ERROR: could not remove bucket instance entry" << bucket.name << "with ret as " << ret << dendl;
750       return ret;
751     }
752   }
753
754   ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
755   if (ret < 0) {
756     lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
757   }
758
759   return ret;
760 }
761
762 int rgw_bucket_delete_bucket_obj(RGWRados *store,
763                                  const string& tenant_name,
764                                  const string& bucket_name,
765                                  RGWObjVersionTracker& objv_tracker)
766 {
767   string key;
768
769   rgw_make_bucket_entry_name(tenant_name, bucket_name, key);
770   return store->meta_mgr->remove_entry(bucket_meta_handler, key, &objv_tracker);
771 }
772
773 static void set_err_msg(std::string *sink, std::string msg)
774 {
775   if (sink && !msg.empty())
776     *sink = msg;
777 }
778
779 int RGWBucket::init(RGWRados *storage, RGWBucketAdminOpState& op_state)
780 {
781   if (!storage)
782     return -EINVAL;
783
784   store = storage;
785
786   rgw_user user_id = op_state.get_user_id();
787   tenant = user_id.tenant;
788   bucket_name = op_state.get_bucket_name();
789   RGWUserBuckets user_buckets;
790   RGWObjectCtx obj_ctx(store);
791
792   if (bucket_name.empty() && user_id.empty())
793     return -EINVAL;
794
795   if (!bucket_name.empty()) {
796     int r = store->get_bucket_info(obj_ctx, tenant, bucket_name, bucket_info, NULL);
797     if (r < 0) {
798       ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket_name << dendl;
799       return r;
800     }
801
802     op_state.set_bucket(bucket_info.bucket);
803   }
804
805   if (!user_id.empty()) {
806     int r = rgw_get_user_info_by_uid(store, user_id, user_info);
807     if (r < 0)
808       return r;
809
810     op_state.display_name = user_info.display_name;
811   }
812
813   clear_failure();
814   return 0;
815 }
816
817 int RGWBucket::link(RGWBucketAdminOpState& op_state, std::string *err_msg)
818 {
819   if (!op_state.is_user_op()) {
820     set_err_msg(err_msg, "empty user id");
821     return -EINVAL;
822   }
823
824   string bucket_id = op_state.get_bucket_id();
825   if (bucket_id.empty()) {
826     set_err_msg(err_msg, "empty bucket instance id");
827     return -EINVAL;
828   }
829
830   std::string display_name = op_state.get_user_display_name();
831   rgw_bucket bucket = op_state.get_bucket();
832
833   const rgw_pool& root_pool = store->get_zone_params().domain_root;
834   rgw_raw_obj obj(root_pool, bucket.name);
835   RGWObjVersionTracker objv_tracker;
836
837   map<string, bufferlist> attrs;
838   RGWBucketInfo bucket_info;
839
840   string key = bucket.name + ":" + bucket_id;
841   RGWObjectCtx obj_ctx(store);
842   int r = store->get_bucket_instance_info(obj_ctx, key, bucket_info, NULL, &attrs);
843   if (r < 0) {
844     return r;
845   }
846
847   rgw_user user_id = op_state.get_user_id();
848
849   map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
850   if (aiter != attrs.end()) {
851     bufferlist aclbl = aiter->second;
852     RGWAccessControlPolicy policy;
853     ACLOwner owner;
854     try {
855      bufferlist::iterator iter = aclbl.begin();
856      ::decode(policy, iter);
857      owner = policy.get_owner();
858     } catch (buffer::error& err) {
859       set_err_msg(err_msg, "couldn't decode policy");
860       return -EIO;
861     }
862
863     r = rgw_unlink_bucket(store, owner.get_id(), bucket.tenant, bucket.name, false);
864     if (r < 0) {
865       set_err_msg(err_msg, "could not unlink policy from user " + owner.get_id().to_str());
866       return r;
867     }
868
869     // now update the user for the bucket...
870     if (display_name.empty()) {
871       ldout(store->ctx(), 0) << "WARNING: user " << user_info.user_id << " has no display name set" << dendl;
872     }
873     policy.create_default(user_info.user_id, display_name);
874
875     owner = policy.get_owner();
876     r = store->set_bucket_owner(bucket_info.bucket, owner);
877     if (r < 0) {
878       set_err_msg(err_msg, "failed to set bucket owner: " + cpp_strerror(-r));
879       return r;
880     }
881
882     // ...and encode the acl
883     aclbl.clear();
884     policy.encode(aclbl);
885
886     r = store->system_obj_set_attr(NULL, obj, RGW_ATTR_ACL, aclbl, &objv_tracker);
887     if (r < 0) {
888       return r;
889     }
890
891     RGWAccessControlPolicy policy_instance;
892     policy_instance.create_default(user_info.user_id, display_name);
893     aclbl.clear();
894     policy_instance.encode(aclbl);
895
896     string oid_bucket_instance = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
897     rgw_raw_obj obj_bucket_instance(root_pool, oid_bucket_instance);
898     r = store->system_obj_set_attr(NULL, obj_bucket_instance, RGW_ATTR_ACL, aclbl, &objv_tracker);
899     if (r < 0) {
900       return r;
901     }
902
903     r = rgw_link_bucket(store, user_info.user_id, bucket_info.bucket,
904                         ceph::real_time());
905     if (r < 0) {
906       return r;
907     }
908   }
909
910   return 0;
911 }
912
913 int RGWBucket::unlink(RGWBucketAdminOpState& op_state, std::string *err_msg)
914 {
915   rgw_bucket bucket = op_state.get_bucket();
916
917   if (!op_state.is_user_op()) {
918     set_err_msg(err_msg, "could not fetch user or user bucket info");
919     return -EINVAL;
920   }
921
922   int r = rgw_unlink_bucket(store, user_info.user_id, bucket.tenant, bucket.name);
923   if (r < 0) {
924     set_err_msg(err_msg, "error unlinking bucket" + cpp_strerror(-r));
925   }
926
927   return r;
928 }
929
930 int RGWBucket::remove(RGWBucketAdminOpState& op_state, bool bypass_gc,
931                       bool keep_index_consistent, std::string *err_msg)
932 {
933   bool delete_children = op_state.will_delete_children();
934   rgw_bucket bucket = op_state.get_bucket();
935   int ret;
936
937   if (bypass_gc) {
938     if (delete_children) {
939       ret = rgw_remove_bucket_bypass_gc(store, bucket, op_state.get_max_aio(), keep_index_consistent);
940     } else {
941       set_err_msg(err_msg, "purge objects should be set for gc to be bypassed");
942       return -EINVAL;
943     }
944   } else {
945     ret = rgw_remove_bucket(store, bucket, delete_children);
946   }
947
948   if (ret < 0) {
949     set_err_msg(err_msg, "unable to remove bucket" + cpp_strerror(-ret));
950     return ret;
951   }
952
953   return 0;
954 }
955
956 int RGWBucket::remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg)
957 {
958   rgw_bucket bucket = op_state.get_bucket();
959   std::string object_name = op_state.get_object_name();
960
961   rgw_obj_key key(object_name);
962
963   int ret = rgw_remove_object(store, bucket_info, bucket, key);
964   if (ret < 0) {
965     set_err_msg(err_msg, "unable to remove object" + cpp_strerror(-ret));
966     return ret;
967   }
968
969   return 0;
970 }
971
972 static void dump_bucket_index(map<string, rgw_bucket_dir_entry> result,  Formatter *f)
973 {
974   map<string, rgw_bucket_dir_entry>::iterator iter;
975   for (iter = result.begin(); iter != result.end(); ++iter) {
976     f->dump_string("object", iter->first);
977    }
978 }
979
980 static void dump_bucket_usage(map<RGWObjCategory, RGWStorageStats>& stats, Formatter *formatter)
981 {
982   map<RGWObjCategory, RGWStorageStats>::iterator iter;
983
984   formatter->open_object_section("usage");
985   for (iter = stats.begin(); iter != stats.end(); ++iter) {
986     RGWStorageStats& s = iter->second;
987     const char *cat_name = rgw_obj_category_name(iter->first);
988     formatter->open_object_section(cat_name);
989     s.dump(formatter);
990     formatter->close_section();
991   }
992   formatter->close_section();
993 }
994
995 static void dump_index_check(map<RGWObjCategory, RGWStorageStats> existing_stats,
996         map<RGWObjCategory, RGWStorageStats> calculated_stats,
997         Formatter *formatter)
998 {
999   formatter->open_object_section("check_result");
1000   formatter->open_object_section("existing_header");
1001   dump_bucket_usage(existing_stats, formatter);
1002   formatter->close_section();
1003   formatter->open_object_section("calculated_header");
1004   dump_bucket_usage(calculated_stats, formatter);
1005   formatter->close_section();
1006   formatter->close_section();
1007 }
1008
1009 int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
1010                RGWFormatterFlusher& flusher ,std::string *err_msg)
1011 {
1012   bool fix_index = op_state.will_fix_index();
1013   rgw_bucket bucket = op_state.get_bucket();
1014
1015   size_t max = 1000;
1016
1017   map<string, bool> common_prefixes;
1018
1019   bool is_truncated;
1020   map<string, bool> meta_objs;
1021   map<rgw_obj_index_key, string> all_objs;
1022
1023   RGWBucketInfo bucket_info;
1024   RGWObjectCtx obj_ctx(store);
1025   int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr);
1026   if (r < 0) {
1027     ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): get_bucket_instance_info(bucket=" << bucket << ") returned r=" << r << dendl;
1028     return r;
1029   }
1030
1031   RGWRados::Bucket target(store, bucket_info);
1032   RGWRados::Bucket::List list_op(&target);
1033
1034   list_op.params.list_versions = true;
1035   list_op.params.ns = RGW_OBJ_NS_MULTIPART;
1036
1037   do {
1038     vector<rgw_bucket_dir_entry> result;
1039     int r = list_op.list_objects(max, &result, &common_prefixes, &is_truncated);
1040     if (r < 0) {
1041       set_err_msg(err_msg, "failed to list objects in bucket=" + bucket.name +
1042               " err=" +  cpp_strerror(-r));
1043
1044       return r;
1045     }
1046
1047     vector<rgw_bucket_dir_entry>::iterator iter;
1048     for (iter = result.begin(); iter != result.end(); ++iter) {
1049       rgw_obj_index_key key = iter->key;
1050       rgw_obj obj(bucket, key);
1051       string oid = obj.get_oid();
1052
1053       int pos = oid.find_last_of('.');
1054       if (pos < 0) {
1055         /* obj has no suffix */
1056         all_objs[key] = oid;
1057       } else {
1058         /* obj has suffix */
1059         string name = oid.substr(0, pos);
1060         string suffix = oid.substr(pos + 1);
1061
1062         if (suffix.compare("meta") == 0) {
1063           meta_objs[name] = true;
1064         } else {
1065           all_objs[key] = name;
1066         }
1067       }
1068     }
1069
1070   } while (is_truncated);
1071
1072   list<rgw_obj_index_key> objs_to_unlink;
1073   Formatter *f =  flusher.get_formatter();
1074
1075   f->open_array_section("invalid_multipart_entries");
1076
1077   for (auto aiter = all_objs.begin(); aiter != all_objs.end(); ++aiter) {
1078     string& name = aiter->second;
1079
1080     if (meta_objs.find(name) == meta_objs.end()) {
1081       objs_to_unlink.push_back(aiter->first);
1082     }
1083
1084     if (objs_to_unlink.size() > max) {
1085       if (fix_index) {
1086         int r = store->remove_objs_from_index(bucket_info, objs_to_unlink);
1087         if (r < 0) {
1088           set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1089                       cpp_strerror(-r));
1090           return r;
1091         }
1092       }
1093
1094       dump_mulipart_index_results(objs_to_unlink, flusher.get_formatter());
1095       flusher.flush();
1096       objs_to_unlink.clear();
1097     }
1098   }
1099
1100   if (fix_index) {
1101     int r = store->remove_objs_from_index(bucket_info, objs_to_unlink);
1102     if (r < 0) {
1103       set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1104               cpp_strerror(-r));
1105
1106       return r;
1107     }
1108   }
1109
1110   dump_mulipart_index_results(objs_to_unlink, f);
1111   f->close_section();
1112   flusher.flush();
1113
1114   return 0;
1115 }
1116
1117 int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,
1118                                   RGWFormatterFlusher& flusher,
1119                                   std::string *err_msg)
1120 {
1121
1122   bool fix_index = op_state.will_fix_index();
1123
1124   rgw_bucket bucket = op_state.get_bucket();
1125
1126   if (!fix_index) {
1127     set_err_msg(err_msg, "check-objects flag requires fix index enabled");
1128     return -EINVAL;
1129   }
1130
1131   store->cls_obj_set_bucket_tag_timeout(bucket_info, BUCKET_TAG_TIMEOUT);
1132
1133   string prefix;
1134   rgw_obj_index_key marker;
1135   bool is_truncated = true;
1136
1137   Formatter *formatter = flusher.get_formatter();
1138   formatter->open_object_section("objects");
1139   while (is_truncated) {
1140     map<string, rgw_bucket_dir_entry> result;
1141
1142     int r = store->cls_bucket_list(bucket_info, RGW_NO_SHARD, marker, prefix, 1000, true,
1143                                    result, &is_truncated, &marker,
1144                                    bucket_object_check_filter);
1145     if (r == -ENOENT) {
1146       break;
1147     } else if (r < 0 && r != -ENOENT) {
1148       set_err_msg(err_msg, "ERROR: failed operation r=" + cpp_strerror(-r));
1149     }
1150
1151
1152     dump_bucket_index(result, formatter);
1153     flusher.flush();
1154
1155   }
1156
1157   formatter->close_section();
1158
1159   store->cls_obj_set_bucket_tag_timeout(bucket_info, 0);
1160
1161   return 0;
1162 }
1163
1164
1165 int RGWBucket::check_index(RGWBucketAdminOpState& op_state,
1166         map<RGWObjCategory, RGWStorageStats>& existing_stats,
1167         map<RGWObjCategory, RGWStorageStats>& calculated_stats,
1168         std::string *err_msg)
1169 {
1170   rgw_bucket bucket = op_state.get_bucket();
1171   bool fix_index = op_state.will_fix_index();
1172
1173   int r = store->bucket_check_index(bucket_info, &existing_stats, &calculated_stats);
1174   if (r < 0) {
1175     set_err_msg(err_msg, "failed to check index error=" + cpp_strerror(-r));
1176     return r;
1177   }
1178
1179   if (fix_index) {
1180     r = store->bucket_rebuild_index(bucket_info);
1181     if (r < 0) {
1182       set_err_msg(err_msg, "failed to rebuild index err=" + cpp_strerror(-r));
1183       return r;
1184     }
1185   }
1186
1187   return 0;
1188 }
1189
1190
1191 int RGWBucket::policy_bl_to_stream(bufferlist& bl, ostream& o)
1192 {
1193   RGWAccessControlPolicy_S3 policy(g_ceph_context);
1194   bufferlist::iterator iter = bl.begin();
1195   try {
1196     policy.decode(iter);
1197   } catch (buffer::error& err) {
1198     dout(0) << "ERROR: caught buffer::error, could not decode policy" << dendl;
1199     return -EIO;
1200   }
1201   policy.to_xml(o);
1202   return 0;
1203 }
1204
1205 static int policy_decode(RGWRados *store, bufferlist& bl, RGWAccessControlPolicy& policy)
1206 {
1207   bufferlist::iterator iter = bl.begin();
1208   try {
1209     policy.decode(iter);
1210   } catch (buffer::error& err) {
1211     ldout(store->ctx(), 0) << "ERROR: caught buffer::error, could not decode policy" << dendl;
1212     return -EIO;
1213   }
1214   return 0;
1215 }
1216
1217 int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy)
1218 {
1219   std::string object_name = op_state.get_object_name();
1220   rgw_bucket bucket = op_state.get_bucket();
1221   RGWObjectCtx obj_ctx(store);
1222
1223   RGWBucketInfo bucket_info;
1224   map<string, bufferlist> attrs;
1225   int ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, bucket_info, NULL, &attrs);
1226   if (ret < 0) {
1227     return ret;
1228   }
1229
1230   if (!object_name.empty()) {
1231     bufferlist bl;
1232     rgw_obj obj(bucket, object_name);
1233
1234     RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
1235     RGWRados::Object::Read rop(&op_target);
1236
1237     int ret = rop.get_attr(RGW_ATTR_ACL, bl);
1238     if (ret < 0)
1239       return ret;
1240
1241     return policy_decode(store, bl, policy);
1242   }
1243
1244   map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
1245   if (aiter == attrs.end()) {
1246     return -ENOENT;
1247   }
1248
1249   return policy_decode(store, aiter->second, policy);
1250 }
1251
1252
1253 int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1254                   RGWAccessControlPolicy& policy)
1255 {
1256   RGWBucket bucket;
1257
1258   int ret = bucket.init(store, op_state);
1259   if (ret < 0)
1260     return ret;
1261
1262   ret = bucket.get_policy(op_state, policy);
1263   if (ret < 0)
1264     return ret;
1265
1266   return 0;
1267 }
1268
1269 /* Wrappers to facilitate RESTful interface */
1270
1271
1272 int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1273                   RGWFormatterFlusher& flusher)
1274 {
1275   RGWAccessControlPolicy policy(store->ctx());
1276
1277   int ret = get_policy(store, op_state, policy);
1278   if (ret < 0)
1279     return ret;
1280
1281   Formatter *formatter = flusher.get_formatter();
1282
1283   flusher.start(0);
1284
1285   formatter->open_object_section("policy");
1286   policy.dump(formatter);
1287   formatter->close_section();
1288
1289   flusher.flush();
1290
1291   return 0;
1292 }
1293
1294 int RGWBucketAdminOp::dump_s3_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1295                   ostream& os)
1296 {
1297   RGWAccessControlPolicy_S3 policy(store->ctx());
1298
1299   int ret = get_policy(store, op_state, policy);
1300   if (ret < 0)
1301     return ret;
1302
1303   policy.to_xml(os);
1304
1305   return 0;
1306 }
1307
1308 int RGWBucketAdminOp::unlink(RGWRados *store, RGWBucketAdminOpState& op_state)
1309 {
1310   RGWBucket bucket;
1311
1312   int ret = bucket.init(store, op_state);
1313   if (ret < 0)
1314     return ret;
1315
1316   return bucket.unlink(op_state);
1317 }
1318
1319 int RGWBucketAdminOp::link(RGWRados *store, RGWBucketAdminOpState& op_state, string *err)
1320 {
1321   RGWBucket bucket;
1322
1323   int ret = bucket.init(store, op_state);
1324   if (ret < 0)
1325     return ret;
1326
1327   return bucket.link(op_state, err);
1328
1329 }
1330
1331 int RGWBucketAdminOp::check_index(RGWRados *store, RGWBucketAdminOpState& op_state,
1332                   RGWFormatterFlusher& flusher)
1333 {
1334   int ret;
1335   map<RGWObjCategory, RGWStorageStats> existing_stats;
1336   map<RGWObjCategory, RGWStorageStats> calculated_stats;
1337
1338
1339   RGWBucket bucket;
1340
1341   ret = bucket.init(store, op_state);
1342   if (ret < 0)
1343     return ret;
1344
1345   Formatter *formatter = flusher.get_formatter();
1346   flusher.start(0);
1347
1348   ret = bucket.check_bad_index_multipart(op_state, flusher);
1349   if (ret < 0)
1350     return ret;
1351
1352   ret = bucket.check_object_index(op_state, flusher);
1353   if (ret < 0)
1354     return ret;
1355
1356   ret = bucket.check_index(op_state, existing_stats, calculated_stats);
1357   if (ret < 0)
1358     return ret;
1359
1360   dump_index_check(existing_stats, calculated_stats, formatter);
1361   flusher.flush();
1362
1363   return 0;
1364 }
1365
1366 int RGWBucketAdminOp::remove_bucket(RGWRados *store, RGWBucketAdminOpState& op_state,
1367                                     bool bypass_gc, bool keep_index_consistent)
1368 {
1369   RGWBucket bucket;
1370
1371   int ret = bucket.init(store, op_state);
1372   if (ret < 0)
1373     return ret;
1374
1375   std::string err_msg;
1376   ret = bucket.remove(op_state, bypass_gc, keep_index_consistent, &err_msg);
1377   if (!err_msg.empty()) {
1378     lderr(store->ctx()) << "ERROR: " << err_msg << dendl;
1379   }
1380   return ret;
1381 }
1382
1383 int RGWBucketAdminOp::remove_object(RGWRados *store, RGWBucketAdminOpState& op_state)
1384 {
1385   RGWBucket bucket;
1386
1387   int ret = bucket.init(store, op_state);
1388   if (ret < 0)
1389     return ret;
1390
1391   return bucket.remove_object(op_state);
1392 }
1393
1394 static int bucket_stats(RGWRados *store, const std::string& tenant_name, std::string&  bucket_name, Formatter *formatter)
1395 {
1396   RGWBucketInfo bucket_info;
1397   map<RGWObjCategory, RGWStorageStats> stats;
1398
1399   real_time mtime;
1400   RGWObjectCtx obj_ctx(store);
1401   int r = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, &mtime);
1402   if (r < 0)
1403     return r;
1404
1405   rgw_bucket& bucket = bucket_info.bucket;
1406
1407   string bucket_ver, master_ver;
1408   string max_marker;
1409   int ret = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, &max_marker);
1410   if (ret < 0) {
1411     cerr << "error getting bucket stats ret=" << ret << std::endl;
1412     return ret;
1413   }
1414
1415   utime_t ut(mtime);
1416
1417   formatter->open_object_section("stats");
1418   formatter->dump_string("bucket", bucket.name);
1419   formatter->dump_string("zonegroup", bucket_info.zonegroup);
1420   formatter->dump_string("placement_rule", bucket_info.placement_rule);
1421   ::encode_json("explicit_placement", bucket.explicit_placement, formatter);
1422   formatter->dump_string("id", bucket.bucket_id);
1423   formatter->dump_string("marker", bucket.marker);
1424   formatter->dump_stream("index_type") << bucket_info.index_type;
1425   ::encode_json("owner", bucket_info.owner, formatter);
1426   formatter->dump_string("ver", bucket_ver);
1427   formatter->dump_string("master_ver", master_ver);
1428   formatter->dump_stream("mtime") << ut;
1429   formatter->dump_string("max_marker", max_marker);
1430   dump_bucket_usage(stats, formatter);
1431   encode_json("bucket_quota", bucket_info.quota, formatter);
1432   formatter->close_section();
1433
1434   return 0;
1435 }
1436
1437 int RGWBucketAdminOp::limit_check(RGWRados *store,
1438                                   RGWBucketAdminOpState& op_state,
1439                                   const std::list<std::string>& user_ids,
1440                                   RGWFormatterFlusher& flusher,
1441                                   bool warnings_only)
1442 {
1443   int ret = 0;
1444   const size_t max_entries =
1445     store->ctx()->_conf->rgw_list_buckets_max_chunk;
1446
1447   const size_t safe_max_objs_per_shard =
1448     store->ctx()->_conf->rgw_safe_max_objects_per_shard;
1449
1450   uint16_t shard_warn_pct =
1451     store->ctx()->_conf->rgw_shard_warning_threshold;
1452   if (shard_warn_pct > 100)
1453     shard_warn_pct = 90;
1454
1455   Formatter *formatter = flusher.get_formatter();
1456   flusher.start(0);
1457
1458   formatter->open_array_section("users");
1459
1460   for (const auto& user_id : user_ids) {
1461     formatter->open_object_section("user");
1462     formatter->dump_string("user_id", user_id);
1463     bool done;
1464     formatter->open_array_section("buckets");
1465     do {
1466       RGWUserBuckets buckets;
1467       string marker;
1468       bool is_truncated;
1469
1470       ret = rgw_read_user_buckets(store, user_id, buckets,
1471                                   marker, string(), max_entries, false,
1472                                   &is_truncated);
1473       if (ret < 0)
1474         return ret;
1475
1476       map<string, RGWBucketEnt>& m_buckets = buckets.get_buckets();
1477
1478       for (const auto& iter : m_buckets) {
1479         auto& bucket = iter.second.bucket;
1480         uint32_t num_shards = 1;
1481         uint64_t num_objects = 0;
1482
1483         /* need info for num_shards */
1484         RGWBucketInfo info;
1485         RGWObjectCtx obj_ctx(store);
1486
1487         marker = bucket.name; /* Casey's location for marker update,
1488                                * as we may now not reach the end of
1489                                * the loop body */
1490
1491         ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name,
1492                                      info, nullptr);
1493         if (ret < 0)
1494           continue;
1495
1496         /* need stats for num_entries */
1497         string bucket_ver, master_ver;
1498         std::map<RGWObjCategory, RGWStorageStats> stats;
1499         ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver,
1500                                       &master_ver, stats, nullptr);
1501
1502         if (ret < 0)
1503           continue;
1504
1505         for (const auto& s : stats) {
1506             num_objects += s.second.num_objects;
1507         }
1508
1509         num_shards = info.num_shards;
1510         uint64_t objs_per_shard =
1511           (num_shards) ? num_objects/num_shards : num_objects;
1512         {
1513           bool warn = false;
1514           stringstream ss;
1515           if (objs_per_shard > safe_max_objs_per_shard) {
1516             double over =
1517               100 - (safe_max_objs_per_shard/objs_per_shard * 100);
1518               ss << boost::format("OVER %4f%%") % over;
1519               warn = true;
1520           } else {
1521             double fill_pct =
1522               objs_per_shard / safe_max_objs_per_shard * 100;
1523             if (fill_pct >= shard_warn_pct) {
1524               ss << boost::format("WARN %4f%%") % fill_pct;
1525               warn = true;
1526             } else {
1527               ss << "OK";
1528             }
1529           }
1530
1531           if (warn || (! warnings_only)) {
1532             formatter->open_object_section("bucket");
1533             formatter->dump_string("bucket", bucket.name);
1534             formatter->dump_string("tenant", bucket.tenant);
1535             formatter->dump_int("num_objects", num_objects);
1536             formatter->dump_int("num_shards", num_shards);
1537             formatter->dump_int("objects_per_shard", objs_per_shard);
1538             formatter->dump_string("fill_status", ss.str());
1539             formatter->close_section();
1540           }
1541         }
1542       }
1543
1544       done = (m_buckets.size() < max_entries);
1545     } while (!done); /* foreach: bucket */
1546
1547     formatter->close_section();
1548     formatter->close_section();
1549     formatter->flush(cout);
1550
1551   } /* foreach: user_id */
1552
1553   formatter->close_section();
1554   formatter->flush(cout);
1555
1556   return ret;
1557 } /* RGWBucketAdminOp::limit_check */
1558
1559 int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state,
1560                   RGWFormatterFlusher& flusher)
1561 {
1562   RGWBucket bucket;
1563   int ret;
1564
1565   string bucket_name = op_state.get_bucket_name();
1566
1567   if (!bucket_name.empty()) {
1568     ret = bucket.init(store, op_state);
1569     if (ret < 0)
1570       return ret;
1571   }
1572
1573   Formatter *formatter = flusher.get_formatter();
1574   flusher.start(0);
1575
1576   CephContext *cct = store->ctx();
1577
1578   const size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
1579
1580   bool show_stats = op_state.will_fetch_stats();
1581   rgw_user user_id = op_state.get_user_id();
1582   if (op_state.is_user_op()) {
1583     formatter->open_array_section("buckets");
1584
1585     RGWUserBuckets buckets;
1586     string marker;
1587     bool is_truncated = false;
1588
1589     do {
1590       ret = rgw_read_user_buckets(store, op_state.get_user_id(), buckets,
1591                                   marker, string(), max_entries, false,
1592                                   &is_truncated);
1593       if (ret < 0)
1594         return ret;
1595
1596       map<string, RGWBucketEnt>& m = buckets.get_buckets();
1597       map<string, RGWBucketEnt>::iterator iter;
1598
1599       for (iter = m.begin(); iter != m.end(); ++iter) {
1600         std::string  obj_name = iter->first;
1601         if (show_stats)
1602           bucket_stats(store, user_id.tenant, obj_name, formatter);
1603         else
1604           formatter->dump_string("bucket", obj_name);
1605
1606         marker = obj_name;
1607       }
1608
1609       flusher.flush();
1610     } while (is_truncated);
1611
1612     formatter->close_section();
1613   } else if (!bucket_name.empty()) {
1614     bucket_stats(store, user_id.tenant, bucket_name, formatter);
1615   } else {
1616     RGWAccessHandle handle;
1617
1618     formatter->open_array_section("buckets");
1619     if (store->list_buckets_init(&handle) >= 0) {
1620       rgw_bucket_dir_entry obj;
1621       while (store->list_buckets_next(obj, &handle) >= 0) {
1622         if (show_stats)
1623           bucket_stats(store, user_id.tenant, obj.key.name, formatter);
1624         else
1625           formatter->dump_string("bucket", obj.key.name);
1626       }
1627     }
1628
1629     formatter->close_section();
1630   }
1631
1632   flusher.flush();
1633
1634   return 0;
1635 }
1636
1637
1638 void rgw_data_change::dump(Formatter *f) const
1639 {
1640   string type;
1641   switch (entity_type) {
1642     case ENTITY_TYPE_BUCKET:
1643       type = "bucket";
1644       break;
1645     default:
1646       type = "unknown";
1647   }
1648   encode_json("entity_type", type, f);
1649   encode_json("key", key, f);
1650   utime_t ut(timestamp);
1651   encode_json("timestamp", ut, f);
1652 }
1653
1654 void rgw_data_change::decode_json(JSONObj *obj) {
1655   string s;
1656   JSONDecoder::decode_json("entity_type", s, obj);
1657   if (s == "bucket") {
1658     entity_type = ENTITY_TYPE_BUCKET;
1659   } else {
1660     entity_type = ENTITY_TYPE_UNKNOWN;
1661   }
1662   JSONDecoder::decode_json("key", key, obj);
1663   utime_t ut;
1664   JSONDecoder::decode_json("timestamp", ut, obj);
1665   timestamp = ut.to_real_time();
1666 }
1667
1668 void rgw_data_change_log_entry::dump(Formatter *f) const
1669 {
1670   encode_json("log_id", log_id, f);
1671   utime_t ut(log_timestamp);
1672   encode_json("log_timestamp", ut, f);
1673   encode_json("entry", entry, f);
1674 }
1675
1676 void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
1677   JSONDecoder::decode_json("log_id", log_id, obj);
1678   utime_t ut;
1679   JSONDecoder::decode_json("log_timestamp", ut, obj);
1680   log_timestamp = ut.to_real_time();
1681   JSONDecoder::decode_json("entry", entry, obj);
1682 }
1683
1684 int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
1685     const string& name = bs.bucket.name;
1686     int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
1687     uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
1688
1689     return (int)r;
1690 }
1691
1692 int RGWDataChangesLog::renew_entries()
1693 {
1694   if (!store->need_to_log_data())
1695     return 0;
1696
1697   /* we can't keep the bucket name as part of the cls_log_entry, and we need
1698    * it later, so we keep two lists under the map */
1699   map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
1700
1701   lock.Lock();
1702   map<rgw_bucket_shard, bool> entries;
1703   entries.swap(cur_cycle);
1704   lock.Unlock();
1705
1706   map<rgw_bucket_shard, bool>::iterator iter;
1707   string section;
1708   real_time ut = real_clock::now();
1709   for (iter = entries.begin(); iter != entries.end(); ++iter) {
1710     const rgw_bucket_shard& bs = iter->first;
1711
1712     int index = choose_oid(bs);
1713
1714     cls_log_entry entry;
1715
1716     rgw_data_change change;
1717     bufferlist bl;
1718     change.entity_type = ENTITY_TYPE_BUCKET;
1719     change.key = bs.get_key();
1720     change.timestamp = ut;
1721     ::encode(change, bl);
1722
1723     store->time_log_prepare_entry(entry, ut, section, change.key, bl);
1724
1725     m[index].first.push_back(bs);
1726     m[index].second.emplace_back(std::move(entry));
1727   }
1728
1729   map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
1730   for (miter = m.begin(); miter != m.end(); ++miter) {
1731     list<cls_log_entry>& entries = miter->second.second;
1732
1733     real_time now = real_clock::now();
1734
1735     int ret = store->time_log_add(oids[miter->first], entries, NULL);
1736     if (ret < 0) {
1737       /* we don't really need to have a special handling for failed cases here,
1738        * as this is just an optimization. */
1739       lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl;
1740       return ret;
1741     }
1742
1743     real_time expiration = now;
1744     expiration += make_timespan(cct->_conf->rgw_data_log_window);
1745
1746     list<rgw_bucket_shard>& buckets = miter->second.first;
1747     list<rgw_bucket_shard>::iterator liter;
1748     for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
1749       update_renewed(*liter, expiration);
1750     }
1751   }
1752
1753   return 0;
1754 }
1755
1756 void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
1757 {
1758   assert(lock.is_locked());
1759   if (!changes.find(bs, status)) {
1760     status = ChangeStatusPtr(new ChangeStatus);
1761     changes.add(bs, status);
1762   }
1763 }
1764
1765 void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
1766 {
1767   Mutex::Locker l(lock);
1768   cur_cycle[bs] = true;
1769 }
1770
1771 void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration)
1772 {
1773   Mutex::Locker l(lock);
1774   ChangeStatusPtr status;
1775   _get_change(bs, status);
1776
1777   ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
1778   status->cur_expiration = expiration;
1779 }
1780
1781 int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
1782   rgw_bucket_shard bs(bucket, shard_id);
1783
1784   return choose_oid(bs);
1785 }
1786
1787 int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
1788   if (!store->need_to_log_data())
1789     return 0;
1790
1791   rgw_bucket_shard bs(bucket, shard_id);
1792
1793   int index = choose_oid(bs);
1794   mark_modified(index, bs);
1795
1796   lock.Lock();
1797
1798   ChangeStatusPtr status;
1799   _get_change(bs, status);
1800
1801   lock.Unlock();
1802
1803   real_time now = real_clock::now();
1804
1805   status->lock->Lock();
1806
1807   ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
1808
1809   if (now < status->cur_expiration) {
1810     /* no need to send, recently completed */
1811     status->lock->Unlock();
1812
1813     register_renew(bs);
1814     return 0;
1815   }
1816
1817   RefCountedCond *cond;
1818
1819   if (status->pending) {
1820     cond = status->cond;
1821
1822     assert(cond);
1823
1824     status->cond->get();
1825     status->lock->Unlock();
1826
1827     int ret = cond->wait();
1828     cond->put();
1829     if (!ret) {
1830       register_renew(bs);
1831     }
1832     return ret;
1833   }
1834
1835   status->cond = new RefCountedCond;
1836   status->pending = true;
1837
1838   string& oid = oids[index];
1839   real_time expiration;
1840
1841   int ret;
1842
1843   do {
1844     status->cur_sent = now;
1845
1846     expiration = now;
1847     expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
1848
1849     status->lock->Unlock();
1850   
1851     bufferlist bl;
1852     rgw_data_change change;
1853     change.entity_type = ENTITY_TYPE_BUCKET;
1854     change.key = bs.get_key();
1855     change.timestamp = now;
1856     ::encode(change, bl);
1857     string section;
1858
1859     ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
1860
1861     ret = store->time_log_add(oid, now, section, change.key, bl);
1862
1863     now = real_clock::now();
1864
1865     status->lock->Lock();
1866
1867   } while (!ret && real_clock::now() > expiration);
1868
1869   cond = status->cond;
1870
1871   status->pending = false;
1872   status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
1873   status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
1874   status->cond = NULL;
1875   status->lock->Unlock();
1876
1877   cond->done(ret);
1878   cond->put();
1879
1880   return ret;
1881 }
1882
1883 int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
1884                                     list<rgw_data_change_log_entry>& entries,
1885                                     const string& marker,
1886                                     string *out_marker,
1887                                     bool *truncated) {
1888   if (shard >= num_shards)
1889     return -EINVAL;
1890
1891   list<cls_log_entry> log_entries;
1892
1893   int ret = store->time_log_list(oids[shard], start_time, end_time,
1894                                  max_entries, log_entries, marker,
1895                                  out_marker, truncated);
1896   if (ret < 0)
1897     return ret;
1898
1899   list<cls_log_entry>::iterator iter;
1900   for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
1901     rgw_data_change_log_entry log_entry;
1902     log_entry.log_id = iter->id;
1903     real_time rt = iter->timestamp.to_real_time();
1904     log_entry.log_timestamp = rt;
1905     bufferlist::iterator liter = iter->data.begin();
1906     try {
1907       ::decode(log_entry.entry, liter);
1908     } catch (buffer::error& err) {
1909       lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
1910       return -EIO;
1911     }
1912     entries.push_back(log_entry);
1913   }
1914
1915   return 0;
1916 }
1917
1918 int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
1919              list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated) {
1920   bool truncated;
1921   entries.clear();
1922
1923   for (; marker.shard < num_shards && (int)entries.size() < max_entries;
1924        marker.shard++, marker.marker.clear()) {
1925     int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
1926                            marker.marker, NULL, &truncated);
1927     if (ret == -ENOENT) {
1928       continue;
1929     }
1930     if (ret < 0) {
1931       return ret;
1932     }
1933     if (truncated) {
1934       *ptruncated = true;
1935       return 0;
1936     }
1937   }
1938
1939   *ptruncated = (marker.shard < num_shards);
1940
1941   return 0;
1942 }
1943
1944 int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
1945 {
1946   if (shard_id >= num_shards)
1947     return -EINVAL;
1948
1949   string oid = oids[shard_id];
1950
1951   cls_log_header header;
1952
1953   int ret = store->time_log_info(oid, &header);
1954   if ((ret < 0) && (ret != -ENOENT))
1955     return ret;
1956
1957   info->marker = header.max_marker;
1958   info->last_update = header.max_time.to_real_time();
1959
1960   return 0;
1961 }
1962
1963 int RGWDataChangesLog::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
1964                                     const string& start_marker, const string& end_marker)
1965 {
1966   int ret;
1967
1968   if (shard_id > num_shards)
1969     return -EINVAL;
1970
1971   ret = store->time_log_trim(oids[shard_id], start_time, end_time, start_marker, end_marker);
1972
1973   if (ret == -ENOENT || ret == -ENODATA)
1974     ret = 0;
1975
1976   return ret;
1977 }
1978
1979 int RGWDataChangesLog::trim_entries(const real_time& start_time, const real_time& end_time,
1980                                     const string& start_marker, const string& end_marker)
1981 {
1982   for (int shard = 0; shard < num_shards; shard++) {
1983     int ret = store->time_log_trim(oids[shard], start_time, end_time, start_marker, end_marker);
1984     if (ret == -ENOENT || ret == -ENODATA) {
1985       continue;
1986     }
1987     if (ret < 0)
1988       return ret;
1989   }
1990
1991   return 0;
1992 }
1993
1994 bool RGWDataChangesLog::going_down()
1995 {
1996   return down_flag;
1997 }
1998
1999 RGWDataChangesLog::~RGWDataChangesLog() {
2000   down_flag = true;
2001   renew_thread->stop();
2002   renew_thread->join();
2003   delete renew_thread;
2004   delete[] oids;
2005 }
2006
2007 void *RGWDataChangesLog::ChangesRenewThread::entry() {
2008   do {
2009     dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
2010     int r = log->renew_entries();
2011     if (r < 0) {
2012       dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
2013     }
2014
2015     if (log->going_down())
2016       break;
2017
2018     int interval = cct->_conf->rgw_data_log_window * 3 / 4;
2019     lock.Lock();
2020     cond.WaitInterval(lock, utime_t(interval, 0));
2021     lock.Unlock();
2022   } while (!log->going_down());
2023
2024   return NULL;
2025 }
2026
2027 void RGWDataChangesLog::ChangesRenewThread::stop()
2028 {
2029   Mutex::Locker l(lock);
2030   cond.Signal();
2031 }
2032
2033 void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
2034 {
2035   auto key = bs.get_key();
2036   modified_lock.get_read();
2037   map<int, set<string> >::iterator iter = modified_shards.find(shard_id);
2038   if (iter != modified_shards.end()) {
2039     set<string>& keys = iter->second;
2040     if (keys.find(key) != keys.end()) {
2041       modified_lock.unlock();
2042       return;
2043     }
2044   }
2045   modified_lock.unlock();
2046
2047   RWLock::WLocker wl(modified_lock);
2048   modified_shards[shard_id].insert(key);
2049 }
2050
2051 void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
2052 {
2053   RWLock::WLocker wl(modified_lock);
2054   modified.swap(modified_shards);
2055   modified_shards.clear();
2056 }
2057
2058 void RGWBucketCompleteInfo::dump(Formatter *f) const {
2059   encode_json("bucket_info", info, f);
2060   encode_json("attrs", attrs, f);
2061 }
2062
2063 void RGWBucketCompleteInfo::decode_json(JSONObj *obj) {
2064   JSONDecoder::decode_json("bucket_info", info, obj);
2065   JSONDecoder::decode_json("attrs", attrs, obj);
2066 }
2067
2068 class RGWBucketMetadataHandler : public RGWMetadataHandler {
2069
2070 public:
2071   string get_type() override { return "bucket"; }
2072
2073   int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override {
2074     RGWObjVersionTracker ot;
2075     RGWBucketEntryPoint be;
2076
2077     real_time mtime;
2078     map<string, bufferlist> attrs;
2079     RGWObjectCtx obj_ctx(store);
2080
2081     string tenant_name, bucket_name;
2082     parse_bucket(entry, &tenant_name, &bucket_name);
2083     int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &ot, &mtime, &attrs);
2084     if (ret < 0)
2085       return ret;
2086
2087     RGWBucketEntryMetadataObject *mdo = new RGWBucketEntryMetadataObject(be, ot.read_version, mtime);
2088
2089     *obj = mdo;
2090
2091     return 0;
2092   }
2093
2094   int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2095           real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2096     RGWBucketEntryPoint be, old_be;
2097     try {
2098       decode_json_obj(be, obj);
2099     } catch (JSONDecoder::err& e) {
2100       return -EINVAL;
2101     }
2102
2103     real_time orig_mtime;
2104     map<string, bufferlist> attrs;
2105
2106     RGWObjVersionTracker old_ot;
2107     RGWObjectCtx obj_ctx(store);
2108
2109     string tenant_name, bucket_name;
2110     parse_bucket(entry, &tenant_name, &bucket_name);
2111     int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, old_be, &old_ot, &orig_mtime, &attrs);
2112     if (ret < 0 && ret != -ENOENT)
2113       return ret;
2114
2115     // are we actually going to perform this put, or is it too old?
2116     if (ret != -ENOENT &&
2117         !check_versions(old_ot.read_version, orig_mtime,
2118                         objv_tracker.write_version, mtime, sync_type)) {
2119       return STATUS_NO_APPLY;
2120     }
2121
2122     objv_tracker.read_version = old_ot.read_version; /* maintain the obj version we just read */
2123
2124     ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, be, false, objv_tracker, mtime, &attrs);
2125     if (ret < 0)
2126       return ret;
2127
2128     /* link bucket */
2129     if (be.linked) {
2130       ret = rgw_link_bucket(store, be.owner, be.bucket, be.creation_time, false);
2131     } else {
2132       ret = rgw_unlink_bucket(store, be.owner, be.bucket.tenant,
2133                               be.bucket.name, false);
2134     }
2135
2136     return ret;
2137   }
2138
2139   struct list_keys_info {
2140     RGWRados *store;
2141     RGWListRawObjsCtx ctx;
2142   };
2143
2144   int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
2145     RGWBucketEntryPoint be;
2146     RGWObjectCtx obj_ctx(store);
2147
2148     string tenant_name, bucket_name;
2149     parse_bucket(entry, &tenant_name, &bucket_name);
2150     int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &objv_tracker, NULL, NULL);
2151     if (ret < 0)
2152       return ret;
2153
2154     /*
2155      * We're unlinking the bucket but we don't want to update the entrypoint here - we're removing
2156      * it immediately and don't want to invalidate our cached objv_version or the bucket obj removal
2157      * will incorrectly fail.
2158      */
2159     ret = rgw_unlink_bucket(store, be.owner, tenant_name, bucket_name, false);
2160     if (ret < 0) {
2161       lderr(store->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
2162     }
2163
2164     ret = rgw_bucket_delete_bucket_obj(store, tenant_name, bucket_name, objv_tracker);
2165     if (ret < 0) {
2166       lderr(store->ctx()) << "could not delete bucket=" << entry << dendl;
2167     }
2168     /* idempotent */
2169     return 0;
2170   }
2171
2172   void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2173     oid = key;
2174     pool = store->get_zone_params().domain_root;
2175   }
2176
2177   int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
2178     auto info = ceph::make_unique<list_keys_info>();
2179
2180     info->store = store;
2181
2182     int ret = store->list_raw_objects_init(store->get_zone_params().domain_root, marker,
2183                                            &info->ctx);
2184     if (ret < 0) {
2185       return ret;
2186     }
2187     *phandle = (void *)info.release();
2188
2189     return 0;
2190   }
2191
2192   int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
2193     list_keys_info *info = static_cast<list_keys_info *>(handle);
2194
2195     string no_filter;
2196
2197     keys.clear();
2198
2199     RGWRados *store = info->store;
2200
2201     list<string> unfiltered_keys;
2202
2203     int ret = store->list_raw_objects_next(no_filter, max, info->ctx,
2204                                            unfiltered_keys, truncated);
2205     if (ret < 0 && ret != -ENOENT)
2206       return ret;
2207     if (ret == -ENOENT) {
2208       if (truncated)
2209         *truncated = false;
2210       return 0;
2211     }
2212
2213     // now filter out the system entries
2214     list<string>::iterator iter;
2215     for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
2216       string& k = *iter;
2217
2218       if (k[0] != '.') {
2219         keys.push_back(k);
2220       }
2221     }
2222
2223     return 0;
2224   }
2225
2226   void list_keys_complete(void *handle) override {
2227     list_keys_info *info = static_cast<list_keys_info *>(handle);
2228     delete info;
2229   }
2230
2231   string get_marker(void *handle) {
2232     list_keys_info *info = static_cast<list_keys_info *>(handle);
2233     return info->store->list_raw_objs_get_cursor(info->ctx);
2234   }
2235 };
2236
2237 class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler {
2238
2239 public:
2240   string get_type() override { return "bucket.instance"; }
2241
2242   int get(RGWRados *store, string& oid, RGWMetadataObject **obj) override {
2243     RGWBucketCompleteInfo bci;
2244
2245     real_time mtime;
2246     RGWObjectCtx obj_ctx(store);
2247
2248     int ret = store->get_bucket_instance_info(obj_ctx, oid, bci.info, &mtime, &bci.attrs);
2249     if (ret < 0)
2250       return ret;
2251
2252     RGWBucketInstanceMetadataObject *mdo = new RGWBucketInstanceMetadataObject(bci, bci.info.objv_tracker.read_version, mtime);
2253
2254     *obj = mdo;
2255
2256     return 0;
2257   }
2258
2259   int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2260           real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2261     RGWBucketCompleteInfo bci, old_bci;
2262     try {
2263       decode_json_obj(bci, obj);
2264     } catch (JSONDecoder::err& e) {
2265       return -EINVAL;
2266     }
2267
2268     real_time orig_mtime;
2269     RGWObjectCtx obj_ctx(store);
2270
2271     int ret = store->get_bucket_instance_info(obj_ctx, entry, old_bci.info,
2272             &orig_mtime, &old_bci.attrs);
2273     bool exists = (ret != -ENOENT);
2274     if (ret < 0 && exists)
2275       return ret;
2276
2277     if (!exists || old_bci.info.bucket.bucket_id != bci.info.bucket.bucket_id) {
2278       /* a new bucket, we need to select a new bucket placement for it */
2279       auto key(entry);
2280       rgw_bucket_instance_oid_to_key(key);
2281       string tenant_name;
2282       string bucket_name;
2283       string bucket_instance;
2284       parse_bucket(key, &tenant_name, &bucket_name, &bucket_instance);
2285
2286       RGWZonePlacementInfo rule_info;
2287       bci.info.bucket.name = bucket_name;
2288       bci.info.bucket.bucket_id = bucket_instance;
2289       bci.info.bucket.tenant = tenant_name;
2290       ret = store->select_bucket_location_by_rule(bci.info.placement_rule, &rule_info);
2291       if (ret < 0) {
2292         ldout(store->ctx(), 0) << "ERROR: select_bucket_placement() returned " << ret << dendl;
2293         return ret;
2294       }
2295       bci.info.index_type = rule_info.index_type;
2296     } else {
2297       /* existing bucket, keep its placement */
2298       bci.info.bucket.explicit_placement = old_bci.info.bucket.explicit_placement;
2299       bci.info.placement_rule = old_bci.info.placement_rule;
2300     }
2301
2302     if (exists && old_bci.info.datasync_flag_enabled() != bci.info.datasync_flag_enabled()) {
2303       int shards_num = bci.info.num_shards? bci.info.num_shards : 1;
2304       int shard_id = bci.info.num_shards? 0 : -1;
2305
2306       if (!bci.info.datasync_flag_enabled()) {
2307       ret = store->stop_bi_log_entries(bci.info, -1);
2308         if (ret < 0) {
2309            lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2310            return ret;
2311         }
2312       } else {
2313         ret = store->resync_bi_log_entries(bci.info, -1);
2314         if (ret < 0) {
2315            lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2316            return ret;
2317         }
2318       }
2319
2320       for (int i = 0; i < shards_num; ++i, ++shard_id) {
2321         ret = store->data_log->add_entry(bci.info.bucket, shard_id);
2322         if (ret < 0) {
2323            lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
2324            return ret;
2325         }
2326       }
2327     }
2328
2329     // are we actually going to perform this put, or is it too old?
2330     if (exists &&
2331         !check_versions(old_bci.info.objv_tracker.read_version, orig_mtime,
2332                         objv_tracker.write_version, mtime, sync_type)) {
2333       objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2334       return STATUS_NO_APPLY;
2335     }
2336
2337     /* record the read version (if any), store the new version */
2338     bci.info.objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2339     bci.info.objv_tracker.write_version = objv_tracker.write_version;
2340
2341     ret = store->put_bucket_instance_info(bci.info, false, mtime, &bci.attrs);
2342     if (ret < 0)
2343       return ret;
2344
2345     objv_tracker = bci.info.objv_tracker;
2346
2347     ret = store->init_bucket_index(bci.info, bci.info.num_shards);
2348     if (ret < 0)
2349       return ret;
2350
2351     return STATUS_APPLIED;
2352   }
2353
2354   struct list_keys_info {
2355     RGWRados *store;
2356     RGWListRawObjsCtx ctx;
2357   };
2358
2359   int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
2360     RGWBucketInfo info;
2361     RGWObjectCtx obj_ctx(store);
2362
2363     int ret = store->get_bucket_instance_info(obj_ctx, entry, info, NULL, NULL);
2364     if (ret < 0 && ret != -ENOENT)
2365       return ret;
2366
2367     return rgw_bucket_instance_remove_entry(store, entry, &info.objv_tracker);
2368   }
2369
2370   void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2371     oid = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
2372     rgw_bucket_instance_key_to_oid(oid);
2373     pool = store->get_zone_params().domain_root;
2374   }
2375
2376   int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
2377     auto info = ceph::make_unique<list_keys_info>();
2378
2379     info->store = store;
2380
2381     int ret = store->list_raw_objects_init(store->get_zone_params().domain_root, marker,
2382                                            &info->ctx);
2383     if (ret < 0) {
2384       return ret;
2385     }
2386     *phandle = (void *)info.release();
2387
2388     return 0;
2389   }
2390
2391   int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
2392     list_keys_info *info = static_cast<list_keys_info *>(handle);
2393
2394     string no_filter;
2395
2396     keys.clear();
2397
2398     RGWRados *store = info->store;
2399
2400     list<string> unfiltered_keys;
2401
2402     int ret = store->list_raw_objects_next(no_filter, max, info->ctx,
2403                                            unfiltered_keys, truncated);
2404     if (ret < 0 && ret != -ENOENT)
2405       return ret;
2406     if (ret == -ENOENT) {
2407       if (truncated)
2408         *truncated = false;
2409       return 0;
2410     }
2411
2412     constexpr int prefix_size = sizeof(RGW_BUCKET_INSTANCE_MD_PREFIX) - 1;
2413     // now filter in the relevant entries
2414     list<string>::iterator iter;
2415     for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
2416       string& k = *iter;
2417
2418       if (k.compare(0, prefix_size, RGW_BUCKET_INSTANCE_MD_PREFIX) == 0) {
2419         auto oid = k.substr(prefix_size);
2420         rgw_bucket_instance_oid_to_key(oid);
2421         keys.emplace_back(std::move(oid));
2422       }
2423     }
2424
2425     return 0;
2426   }
2427
2428   void list_keys_complete(void *handle) override {
2429     list_keys_info *info = static_cast<list_keys_info *>(handle);
2430     delete info;
2431   }
2432
2433   string get_marker(void *handle) {
2434     list_keys_info *info = static_cast<list_keys_info *>(handle);
2435     return info->store->list_raw_objs_get_cursor(info->ctx);
2436   }
2437
2438   /*
2439    * hash entry for mdlog placement. Use the same hash key we'd have for the bucket entry
2440    * point, so that the log entries end up at the same log shard, so that we process them
2441    * in order
2442    */
2443   void get_hash_key(const string& section, const string& key, string& hash_key) override {
2444     string k;
2445     int pos = key.find(':');
2446     if (pos < 0)
2447       k = key;
2448     else
2449       k = key.substr(0, pos);
2450     hash_key = "bucket:" + k;
2451   }
2452 };
2453
2454 void rgw_bucket_init(RGWMetadataManager *mm)
2455 {
2456   bucket_meta_handler = new RGWBucketMetadataHandler;
2457   mm->register_handler(bucket_meta_handler);
2458   bucket_instance_meta_handler = new RGWBucketInstanceMetadataHandler;
2459   mm->register_handler(bucket_instance_meta_handler);
2460 }