1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
10 #include <boost/utility/string_ref.hpp>
11 #include <boost/format.hpp>
13 #include "common/errno.h"
14 #include "common/ceph_json.h"
15 #include "common/backport14.h"
16 #include "rgw_rados.h"
18 #include "rgw_acl_s3.h"
20 #include "include/types.h"
21 #include "rgw_bucket.h"
23 #include "rgw_string.h"
24 #include "rgw_multi.h"
26 #include "include/rados/librados.hpp"
27 // until everything is moved from rgw_common
28 #include "rgw_common.h"
30 #include "cls/user/cls_user_types.h"
32 #define dout_context g_ceph_context
33 #define dout_subsys ceph_subsys_rgw
35 #define BUCKET_TAG_TIMEOUT 30
39 static RGWMetadataHandler *bucket_meta_handler = NULL;
40 static RGWMetadataHandler *bucket_instance_meta_handler = NULL;
42 // define as static when RGWBucket implementation compete
43 void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id)
45 buckets_obj_id = user_id.to_str();
46 buckets_obj_id += RGW_BUCKETS_OBJ_SUFFIX;
50 * Note that this is not a reversal of parse_bucket(). That one deals
51 * with the syntax we need in metadata and such. This one deals with
52 * the representation in RADOS pools. We chose '/' because it's not
53 * acceptable in bucket names and thus qualified buckets cannot conflict
54 * with the legacy or S3 buckets.
56 std::string rgw_make_bucket_entry_name(const std::string& tenant_name,
57 const std::string& bucket_name) {
58 std::string bucket_entry;
60 if (bucket_name.empty()) {
62 } else if (tenant_name.empty()) {
63 bucket_entry = bucket_name;
65 bucket_entry = tenant_name + "/" + bucket_name;
72 * Tenants are separated from buckets in URLs by a colon in S3.
73 * This function is not to be used on Swift URLs, not even for COPY arguments.
75 void rgw_parse_url_bucket(const string &bucket, const string& auth_tenant,
76 string &tenant_name, string &bucket_name) {
78 int pos = bucket.find(':');
81 * N.B.: We allow ":bucket" syntax with explicit empty tenant in order
82 * to refer to the legacy tenant, in case users in new named tenants
83 * want to access old global buckets.
85 tenant_name = bucket.substr(0, pos);
86 bucket_name = bucket.substr(pos + 1);
88 tenant_name = auth_tenant;
94 * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
95 * Returns: 0 on success, -ERR# on failure.
97 int rgw_read_user_buckets(RGWRados * store,
98 const rgw_user& user_id,
99 RGWUserBuckets& buckets,
100 const string& marker,
101 const string& end_marker,
105 uint64_t default_amount)
109 std::string buckets_obj_id;
110 rgw_get_buckets_obj(user_id, buckets_obj_id);
111 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
113 bool truncated = false;
119 max = default_amount;
123 std::list<cls_user_bucket_entry> entries;
124 ret = store->cls_user_list_buckets(obj, m, end_marker, max - total, entries, &m, &truncated);
125 if (ret == -ENOENT) {
133 for (auto& entry : entries) {
134 buckets.add(RGWBucketEnt(user_id, std::move(entry)));
138 } while (truncated && total < max);
140 if (is_truncated != nullptr) {
141 *is_truncated = truncated;
145 map<string, RGWBucketEnt>& m = buckets.get_buckets();
146 ret = store->update_containers_stats(m);
147 if (ret < 0 && ret != -ENOENT) {
148 ldout(store->ctx(), 0) << "ERROR: could not get stats for buckets" << dendl;
155 int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info)
157 string buckets_obj_id;
158 rgw_get_buckets_obj(user_id, buckets_obj_id);
159 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
161 return store->cls_user_sync_bucket_stats(obj, bucket_info);
164 int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name)
166 RGWBucketInfo bucket_info;
167 RGWObjectCtx obj_ctx(store);
168 int ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL);
170 ldout(store->ctx(), 0) << "ERROR: could not fetch bucket info: ret=" << ret << dendl;
174 ret = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info);
176 ldout(store->ctx(), 0) << "ERROR: could not sync user stats for bucket " << bucket_name << ": ret=" << ret << dendl;
183 int rgw_link_bucket(RGWRados* const store,
184 const rgw_user& user_id,
186 ceph::real_time creation_time,
187 bool update_entrypoint)
190 string& tenant_name = bucket.tenant;
191 string& bucket_name = bucket.name;
193 cls_user_bucket_entry new_bucket;
195 RGWBucketEntryPoint ep;
196 RGWObjVersionTracker ot;
198 bucket.convert(&new_bucket.bucket);
200 if (real_clock::is_zero(creation_time))
201 new_bucket.creation_time = real_clock::now();
203 new_bucket.creation_time = creation_time;
205 map<string, bufferlist> attrs;
206 RGWObjectCtx obj_ctx(store);
208 if (update_entrypoint) {
209 ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
210 if (ret < 0 && ret != -ENOENT) {
211 ldout(store->ctx(), 0) << "ERROR: store->get_bucket_entrypoint_info() returned: "
212 << cpp_strerror(-ret) << dendl;
216 string buckets_obj_id;
217 rgw_get_buckets_obj(user_id, buckets_obj_id);
219 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
220 ret = store->cls_user_add_bucket(obj, new_bucket);
222 ldout(store->ctx(), 0) << "ERROR: error adding bucket to directory: "
223 << cpp_strerror(-ret) << dendl;
227 if (!update_entrypoint)
233 ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
239 int r = rgw_unlink_bucket(store, user_id, bucket.tenant, bucket.name);
241 ldout(store->ctx(), 0) << "ERROR: failed unlinking bucket on error cleanup: "
242 << cpp_strerror(-r) << dendl;
247 int rgw_unlink_bucket(RGWRados *store, const rgw_user& user_id, const string& tenant_name, const string& bucket_name, bool update_entrypoint)
251 string buckets_obj_id;
252 rgw_get_buckets_obj(user_id, buckets_obj_id);
254 cls_user_bucket bucket;
255 bucket.name = bucket_name;
256 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
257 ret = store->cls_user_remove_bucket(obj, bucket);
259 ldout(store->ctx(), 0) << "ERROR: error removing bucket from directory: "
260 << cpp_strerror(-ret)<< dendl;
263 if (!update_entrypoint)
266 RGWBucketEntryPoint ep;
267 RGWObjVersionTracker ot;
268 map<string, bufferlist> attrs;
269 RGWObjectCtx obj_ctx(store);
270 ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
279 if (ep.owner != user_id) {
280 ldout(store->ctx(), 0) << "bucket entry point user mismatch, can't unlink bucket: " << ep.owner << " != " << user_id << dendl;
285 return store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
288 int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive,
289 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
291 return store->meta_mgr->put_entry(bucket_meta_handler, bucket_name, bl, exclusive, objv_tracker, mtime, pattrs);
294 int rgw_bucket_instance_store_info(RGWRados *store, string& entry, bufferlist& bl, bool exclusive,
295 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
297 return store->meta_mgr->put_entry(bucket_instance_meta_handler, entry, bl, exclusive, objv_tracker, mtime, pattrs);
300 int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker) {
301 return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
304 // 'tenant/' is used in bucket instance keys for sync to avoid parsing ambiguity
305 // with the existing instance[:shard] format. once we parse the shard, the / is
306 // replaced with a : to match the [tenant:]instance format
307 void rgw_bucket_instance_key_to_oid(string& key)
309 // replace tenant/ with tenant:
310 auto c = key.find('/');
311 if (c != string::npos) {
316 // convert bucket instance oids back to the tenant/ format for metadata keys.
317 // it's safe to parse 'tenant:' only for oids, because they won't contain the
318 // optional :shard at the end
319 void rgw_bucket_instance_oid_to_key(string& oid)
321 // find first : (could be tenant:bucket or bucket:instance)
322 auto c = oid.find(':');
323 if (c != string::npos) {
324 // if we find another :, the first one was for tenant
325 if (oid.find(':', c + 1) != string::npos) {
331 int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id)
333 ssize_t pos = bucket_instance.rfind(':');
338 string first = bucket_instance.substr(0, pos);
339 string second = bucket_instance.substr(pos + 1);
341 if (first.find(':') == string::npos) {
343 *target_bucket_instance = bucket_instance;
347 *target_bucket_instance = first;
349 *shard_id = strict_strtol(second.c_str(), 10, &err);
357 // parse key in format: [tenant/]name:instance[:shard_id]
358 int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key,
359 rgw_bucket *bucket, int *shard_id)
361 boost::string_ref name{key};
362 boost::string_ref instance;
365 auto pos = name.find('/');
366 if (pos != boost::string_ref::npos) {
367 auto tenant = name.substr(0, pos);
368 bucket->tenant.assign(tenant.begin(), tenant.end());
369 name = name.substr(pos + 1);
372 // split name:instance
373 pos = name.find(':');
374 if (pos != boost::string_ref::npos) {
375 instance = name.substr(pos + 1);
376 name = name.substr(0, pos);
378 bucket->name.assign(name.begin(), name.end());
380 // split instance:shard
381 pos = instance.find(':');
382 if (pos == boost::string_ref::npos) {
383 bucket->bucket_id.assign(instance.begin(), instance.end());
389 auto shard = instance.substr(pos + 1);
391 auto id = strict_strtol(shard.data(), 10, &err);
393 ldout(cct, 0) << "ERROR: failed to parse bucket shard '"
394 << instance.data() << "': " << err << dendl;
399 instance = instance.substr(0, pos);
400 bucket->bucket_id.assign(instance.begin(), instance.end());
404 int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info,
405 map<string, bufferlist>& attrs,
406 RGWObjVersionTracker *objv_tracker)
408 rgw_bucket& bucket = bucket_info.bucket;
410 if (!bucket_info.has_instance_obj) {
411 /* an old bucket object, need to convert it */
412 RGWObjectCtx obj_ctx(store);
413 int ret = store->convert_old_bucket_info(obj_ctx, bucket.tenant, bucket.name);
415 ldout(store->ctx(), 0) << "ERROR: failed converting old bucket info: " << ret << dendl;
420 /* we want the bucket instance name without the oid prefix cruft */
421 string key = bucket.get_key();
424 ::encode(bucket_info, bl);
426 return rgw_bucket_instance_store_info(store, key, bl, false, &attrs, objv_tracker, real_time());
429 static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
432 for (const auto& o : objs_to_unlink) {
433 f->dump_string("object", o.name);
437 void check_bad_user_bucket_mapping(RGWRados *store, const rgw_user& user_id,
440 RGWUserBuckets user_buckets;
441 bool is_truncated = false;
444 CephContext *cct = store->ctx();
446 size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
449 int ret = rgw_read_user_buckets(store, user_id, user_buckets, marker,
450 string(), max_entries, false,
453 ldout(store->ctx(), 0) << "failed to read user buckets: "
454 << cpp_strerror(-ret) << dendl;
458 map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
459 for (map<string, RGWBucketEnt>::iterator i = buckets.begin();
464 RGWBucketEnt& bucket_ent = i->second;
465 rgw_bucket& bucket = bucket_ent.bucket;
467 RGWBucketInfo bucket_info;
469 RGWObjectCtx obj_ctx(store);
470 int r = store->get_bucket_info(obj_ctx, user_id.tenant, bucket.name, bucket_info, &mtime);
472 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << dendl;
476 rgw_bucket& actual_bucket = bucket_info.bucket;
478 if (actual_bucket.name.compare(bucket.name) != 0 ||
479 actual_bucket.tenant.compare(bucket.tenant) != 0 ||
480 actual_bucket.marker.compare(bucket.marker) != 0 ||
481 actual_bucket.bucket_id.compare(bucket.bucket_id) != 0) {
482 cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl;
484 cout << "fixing" << std::endl;
485 r = rgw_link_bucket(store, user_id, actual_bucket,
486 bucket_info.creation_time);
488 cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl;
493 } while (is_truncated);
496 static bool bucket_object_check_filter(const string& oid)
500 return rgw_obj_key::oid_to_key_in_ns(oid, &key, ns);
503 int rgw_remove_object(RGWRados *store, RGWBucketInfo& bucket_info, rgw_bucket& bucket, rgw_obj_key& key)
505 RGWObjectCtx rctx(store);
507 if (key.instance.empty()) {
508 key.instance = "null";
511 rgw_obj obj(bucket, key);
513 return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
516 int rgw_remove_bucket(RGWRados *store, rgw_bucket& bucket, bool delete_children)
519 map<RGWObjCategory, RGWStorageStats> stats;
520 std::vector<rgw_bucket_dir_entry> objs;
521 map<string, bool> common_prefixes;
523 RGWObjectCtx obj_ctx(store);
525 string bucket_ver, master_ver;
527 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL);
531 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
535 RGWRados::Bucket target(store, info);
536 RGWRados::Bucket::List list_op(&target);
537 CephContext *cct = store->ctx();
540 list_op.params.list_versions = true;
545 ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
549 if (!objs.empty() && !delete_children) {
550 lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << bucket.name << dendl;
554 for (const auto& obj : objs) {
555 rgw_obj_key key(obj.key);
556 ret = rgw_remove_object(store, info, bucket, key);
561 } while (!objs.empty());
563 string prefix, delimiter;
565 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
570 ret = rgw_bucket_sync_user_stats(store, bucket.tenant, info);
572 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
575 RGWObjVersionTracker objv_tracker;
577 ret = store->delete_bucket(info, objv_tracker);
579 lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << dendl;
583 ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
585 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
591 static int aio_wait(librados::AioCompletion *handle)
593 librados::AioCompletion *c = (librados::AioCompletion *)handle;
595 int ret = c->get_return_value();
600 static int drain_handles(list<librados::AioCompletion *>& pending)
603 while (!pending.empty()) {
604 librados::AioCompletion *handle = pending.front();
606 int r = aio_wait(handle);
614 int rgw_remove_bucket_bypass_gc(RGWRados *store, rgw_bucket& bucket,
615 int concurrent_max, bool keep_index_consistent)
618 map<RGWObjCategory, RGWStorageStats> stats;
619 std::vector<rgw_bucket_dir_entry> objs;
620 map<string, bool> common_prefixes;
622 RGWObjectCtx obj_ctx(store);
623 CephContext *cct = store->ctx();
625 string bucket_ver, master_ver;
627 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL);
631 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
635 string prefix, delimiter;
637 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
642 RGWRados::Bucket target(store, info);
643 RGWRados::Bucket::List list_op(&target);
645 list_op.params.list_versions = true;
647 std::list<librados::AioCompletion*> handles;
650 int max_aio = concurrent_max;
651 ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
655 while (!objs.empty()) {
656 std::vector<rgw_bucket_dir_entry>::iterator it = objs.begin();
657 for (; it != objs.end(); ++it) {
658 RGWObjState *astate = NULL;
659 rgw_obj obj(bucket, (*it).key);
661 ret = store->get_obj_state(&obj_ctx, info, obj, &astate, false);
662 if (ret == -ENOENT) {
663 dout(1) << "WARNING: cannot find obj state for obj " << obj.get_oid() << dendl;
667 lderr(store->ctx()) << "ERROR: get obj state returned with error " << ret << dendl;
671 if (astate->has_manifest) {
672 RGWObjManifest& manifest = astate->manifest;
673 RGWObjManifest::obj_iterator miter = manifest.obj_begin();
674 rgw_obj head_obj = manifest.get_obj();
675 rgw_raw_obj raw_head_obj;
676 store->obj_to_raw(info.placement_rule, head_obj, &raw_head_obj);
679 for (; miter != manifest.obj_end() && max_aio--; ++miter) {
681 ret = drain_handles(handles);
683 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
686 max_aio = concurrent_max;
689 rgw_raw_obj last_obj = miter.get_location().get_raw_obj(store);
690 if (last_obj == raw_head_obj) {
691 // have the head obj deleted at the end
695 ret = store->delete_raw_obj_aio(last_obj, handles);
697 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
700 } // for all shadow objs
702 ret = store->delete_obj_aio(head_obj, info, astate, handles, keep_index_consistent);
704 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
710 ret = drain_handles(handles);
712 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
715 max_aio = concurrent_max;
717 } // for all RGW objects
720 ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
725 ret = drain_handles(handles);
727 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
731 ret = rgw_bucket_sync_user_stats(store, bucket.tenant, info);
733 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
736 RGWObjVersionTracker objv_tracker;
738 ret = rgw_bucket_delete_bucket_obj(store, bucket.tenant, bucket.name, objv_tracker);
740 lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << "with ret as " << ret << dendl;
744 if (!store->is_syncing_bucket_meta(bucket)) {
745 RGWObjVersionTracker objv_tracker;
746 string entry = bucket.get_key();
747 ret = rgw_bucket_instance_remove_entry(store, entry, &objv_tracker);
749 lderr(store->ctx()) << "ERROR: could not remove bucket instance entry" << bucket.name << "with ret as " << ret << dendl;
754 ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
756 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
762 int rgw_bucket_delete_bucket_obj(RGWRados *store,
763 const string& tenant_name,
764 const string& bucket_name,
765 RGWObjVersionTracker& objv_tracker)
769 rgw_make_bucket_entry_name(tenant_name, bucket_name, key);
770 return store->meta_mgr->remove_entry(bucket_meta_handler, key, &objv_tracker);
773 static void set_err_msg(std::string *sink, std::string msg)
775 if (sink && !msg.empty())
779 int RGWBucket::init(RGWRados *storage, RGWBucketAdminOpState& op_state)
786 rgw_user user_id = op_state.get_user_id();
787 tenant = user_id.tenant;
788 bucket_name = op_state.get_bucket_name();
789 RGWUserBuckets user_buckets;
790 RGWObjectCtx obj_ctx(store);
792 if (bucket_name.empty() && user_id.empty())
795 if (!bucket_name.empty()) {
796 int r = store->get_bucket_info(obj_ctx, tenant, bucket_name, bucket_info, NULL);
798 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket_name << dendl;
802 op_state.set_bucket(bucket_info.bucket);
805 if (!user_id.empty()) {
806 int r = rgw_get_user_info_by_uid(store, user_id, user_info);
810 op_state.display_name = user_info.display_name;
817 int RGWBucket::link(RGWBucketAdminOpState& op_state, std::string *err_msg)
819 if (!op_state.is_user_op()) {
820 set_err_msg(err_msg, "empty user id");
824 string bucket_id = op_state.get_bucket_id();
825 if (bucket_id.empty()) {
826 set_err_msg(err_msg, "empty bucket instance id");
830 std::string display_name = op_state.get_user_display_name();
831 rgw_bucket bucket = op_state.get_bucket();
833 const rgw_pool& root_pool = store->get_zone_params().domain_root;
834 rgw_raw_obj obj(root_pool, bucket.name);
835 RGWObjVersionTracker objv_tracker;
837 map<string, bufferlist> attrs;
838 RGWBucketInfo bucket_info;
840 string key = bucket.name + ":" + bucket_id;
841 RGWObjectCtx obj_ctx(store);
842 int r = store->get_bucket_instance_info(obj_ctx, key, bucket_info, NULL, &attrs);
847 rgw_user user_id = op_state.get_user_id();
849 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
850 if (aiter != attrs.end()) {
851 bufferlist aclbl = aiter->second;
852 RGWAccessControlPolicy policy;
855 bufferlist::iterator iter = aclbl.begin();
856 ::decode(policy, iter);
857 owner = policy.get_owner();
858 } catch (buffer::error& err) {
859 set_err_msg(err_msg, "couldn't decode policy");
863 r = rgw_unlink_bucket(store, owner.get_id(), bucket.tenant, bucket.name, false);
865 set_err_msg(err_msg, "could not unlink policy from user " + owner.get_id().to_str());
869 // now update the user for the bucket...
870 if (display_name.empty()) {
871 ldout(store->ctx(), 0) << "WARNING: user " << user_info.user_id << " has no display name set" << dendl;
873 policy.create_default(user_info.user_id, display_name);
875 owner = policy.get_owner();
876 r = store->set_bucket_owner(bucket_info.bucket, owner);
878 set_err_msg(err_msg, "failed to set bucket owner: " + cpp_strerror(-r));
882 // ...and encode the acl
884 policy.encode(aclbl);
886 r = store->system_obj_set_attr(NULL, obj, RGW_ATTR_ACL, aclbl, &objv_tracker);
891 RGWAccessControlPolicy policy_instance;
892 policy_instance.create_default(user_info.user_id, display_name);
894 policy_instance.encode(aclbl);
896 string oid_bucket_instance = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
897 rgw_raw_obj obj_bucket_instance(root_pool, oid_bucket_instance);
898 r = store->system_obj_set_attr(NULL, obj_bucket_instance, RGW_ATTR_ACL, aclbl, &objv_tracker);
903 r = rgw_link_bucket(store, user_info.user_id, bucket_info.bucket,
913 int RGWBucket::unlink(RGWBucketAdminOpState& op_state, std::string *err_msg)
915 rgw_bucket bucket = op_state.get_bucket();
917 if (!op_state.is_user_op()) {
918 set_err_msg(err_msg, "could not fetch user or user bucket info");
922 int r = rgw_unlink_bucket(store, user_info.user_id, bucket.tenant, bucket.name);
924 set_err_msg(err_msg, "error unlinking bucket" + cpp_strerror(-r));
930 int RGWBucket::remove(RGWBucketAdminOpState& op_state, bool bypass_gc,
931 bool keep_index_consistent, std::string *err_msg)
933 bool delete_children = op_state.will_delete_children();
934 rgw_bucket bucket = op_state.get_bucket();
938 if (delete_children) {
939 ret = rgw_remove_bucket_bypass_gc(store, bucket, op_state.get_max_aio(), keep_index_consistent);
941 set_err_msg(err_msg, "purge objects should be set for gc to be bypassed");
945 ret = rgw_remove_bucket(store, bucket, delete_children);
949 set_err_msg(err_msg, "unable to remove bucket" + cpp_strerror(-ret));
956 int RGWBucket::remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg)
958 rgw_bucket bucket = op_state.get_bucket();
959 std::string object_name = op_state.get_object_name();
961 rgw_obj_key key(object_name);
963 int ret = rgw_remove_object(store, bucket_info, bucket, key);
965 set_err_msg(err_msg, "unable to remove object" + cpp_strerror(-ret));
972 static void dump_bucket_index(map<string, rgw_bucket_dir_entry> result, Formatter *f)
974 map<string, rgw_bucket_dir_entry>::iterator iter;
975 for (iter = result.begin(); iter != result.end(); ++iter) {
976 f->dump_string("object", iter->first);
980 static void dump_bucket_usage(map<RGWObjCategory, RGWStorageStats>& stats, Formatter *formatter)
982 map<RGWObjCategory, RGWStorageStats>::iterator iter;
984 formatter->open_object_section("usage");
985 for (iter = stats.begin(); iter != stats.end(); ++iter) {
986 RGWStorageStats& s = iter->second;
987 const char *cat_name = rgw_obj_category_name(iter->first);
988 formatter->open_object_section(cat_name);
990 formatter->close_section();
992 formatter->close_section();
995 static void dump_index_check(map<RGWObjCategory, RGWStorageStats> existing_stats,
996 map<RGWObjCategory, RGWStorageStats> calculated_stats,
997 Formatter *formatter)
999 formatter->open_object_section("check_result");
1000 formatter->open_object_section("existing_header");
1001 dump_bucket_usage(existing_stats, formatter);
1002 formatter->close_section();
1003 formatter->open_object_section("calculated_header");
1004 dump_bucket_usage(calculated_stats, formatter);
1005 formatter->close_section();
1006 formatter->close_section();
1009 int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
1010 RGWFormatterFlusher& flusher ,std::string *err_msg)
1012 bool fix_index = op_state.will_fix_index();
1013 rgw_bucket bucket = op_state.get_bucket();
1017 map<string, bool> common_prefixes;
1020 map<string, bool> meta_objs;
1021 map<rgw_obj_index_key, string> all_objs;
1023 RGWBucketInfo bucket_info;
1024 RGWObjectCtx obj_ctx(store);
1025 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr);
1027 ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): get_bucket_instance_info(bucket=" << bucket << ") returned r=" << r << dendl;
1031 RGWRados::Bucket target(store, bucket_info);
1032 RGWRados::Bucket::List list_op(&target);
1034 list_op.params.list_versions = true;
1035 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
1038 vector<rgw_bucket_dir_entry> result;
1039 int r = list_op.list_objects(max, &result, &common_prefixes, &is_truncated);
1041 set_err_msg(err_msg, "failed to list objects in bucket=" + bucket.name +
1042 " err=" + cpp_strerror(-r));
1047 vector<rgw_bucket_dir_entry>::iterator iter;
1048 for (iter = result.begin(); iter != result.end(); ++iter) {
1049 rgw_obj_index_key key = iter->key;
1050 rgw_obj obj(bucket, key);
1051 string oid = obj.get_oid();
1053 int pos = oid.find_last_of('.');
1055 /* obj has no suffix */
1056 all_objs[key] = oid;
1058 /* obj has suffix */
1059 string name = oid.substr(0, pos);
1060 string suffix = oid.substr(pos + 1);
1062 if (suffix.compare("meta") == 0) {
1063 meta_objs[name] = true;
1065 all_objs[key] = name;
1070 } while (is_truncated);
1072 list<rgw_obj_index_key> objs_to_unlink;
1073 Formatter *f = flusher.get_formatter();
1075 f->open_array_section("invalid_multipart_entries");
1077 for (auto aiter = all_objs.begin(); aiter != all_objs.end(); ++aiter) {
1078 string& name = aiter->second;
1080 if (meta_objs.find(name) == meta_objs.end()) {
1081 objs_to_unlink.push_back(aiter->first);
1084 if (objs_to_unlink.size() > max) {
1086 int r = store->remove_objs_from_index(bucket_info, objs_to_unlink);
1088 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1094 dump_mulipart_index_results(objs_to_unlink, flusher.get_formatter());
1096 objs_to_unlink.clear();
1101 int r = store->remove_objs_from_index(bucket_info, objs_to_unlink);
1103 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1110 dump_mulipart_index_results(objs_to_unlink, f);
1117 int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,
1118 RGWFormatterFlusher& flusher,
1119 std::string *err_msg)
1122 bool fix_index = op_state.will_fix_index();
1124 rgw_bucket bucket = op_state.get_bucket();
1127 set_err_msg(err_msg, "check-objects flag requires fix index enabled");
1131 store->cls_obj_set_bucket_tag_timeout(bucket_info, BUCKET_TAG_TIMEOUT);
1134 rgw_obj_index_key marker;
1135 bool is_truncated = true;
1137 Formatter *formatter = flusher.get_formatter();
1138 formatter->open_object_section("objects");
1139 while (is_truncated) {
1140 map<string, rgw_bucket_dir_entry> result;
1142 int r = store->cls_bucket_list(bucket_info, RGW_NO_SHARD, marker, prefix, 1000, true,
1143 result, &is_truncated, &marker,
1144 bucket_object_check_filter);
1147 } else if (r < 0 && r != -ENOENT) {
1148 set_err_msg(err_msg, "ERROR: failed operation r=" + cpp_strerror(-r));
1152 dump_bucket_index(result, formatter);
1157 formatter->close_section();
1159 store->cls_obj_set_bucket_tag_timeout(bucket_info, 0);
1165 int RGWBucket::check_index(RGWBucketAdminOpState& op_state,
1166 map<RGWObjCategory, RGWStorageStats>& existing_stats,
1167 map<RGWObjCategory, RGWStorageStats>& calculated_stats,
1168 std::string *err_msg)
1170 rgw_bucket bucket = op_state.get_bucket();
1171 bool fix_index = op_state.will_fix_index();
1173 int r = store->bucket_check_index(bucket_info, &existing_stats, &calculated_stats);
1175 set_err_msg(err_msg, "failed to check index error=" + cpp_strerror(-r));
1180 r = store->bucket_rebuild_index(bucket_info);
1182 set_err_msg(err_msg, "failed to rebuild index err=" + cpp_strerror(-r));
1191 int RGWBucket::policy_bl_to_stream(bufferlist& bl, ostream& o)
1193 RGWAccessControlPolicy_S3 policy(g_ceph_context);
1194 bufferlist::iterator iter = bl.begin();
1196 policy.decode(iter);
1197 } catch (buffer::error& err) {
1198 dout(0) << "ERROR: caught buffer::error, could not decode policy" << dendl;
1205 static int policy_decode(RGWRados *store, bufferlist& bl, RGWAccessControlPolicy& policy)
1207 bufferlist::iterator iter = bl.begin();
1209 policy.decode(iter);
1210 } catch (buffer::error& err) {
1211 ldout(store->ctx(), 0) << "ERROR: caught buffer::error, could not decode policy" << dendl;
1217 int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy)
1219 std::string object_name = op_state.get_object_name();
1220 rgw_bucket bucket = op_state.get_bucket();
1221 RGWObjectCtx obj_ctx(store);
1223 RGWBucketInfo bucket_info;
1224 map<string, bufferlist> attrs;
1225 int ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, bucket_info, NULL, &attrs);
1230 if (!object_name.empty()) {
1232 rgw_obj obj(bucket, object_name);
1234 RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
1235 RGWRados::Object::Read rop(&op_target);
1237 int ret = rop.get_attr(RGW_ATTR_ACL, bl);
1241 return policy_decode(store, bl, policy);
1244 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
1245 if (aiter == attrs.end()) {
1249 return policy_decode(store, aiter->second, policy);
1253 int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1254 RGWAccessControlPolicy& policy)
1258 int ret = bucket.init(store, op_state);
1262 ret = bucket.get_policy(op_state, policy);
1269 /* Wrappers to facilitate RESTful interface */
1272 int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1273 RGWFormatterFlusher& flusher)
1275 RGWAccessControlPolicy policy(store->ctx());
1277 int ret = get_policy(store, op_state, policy);
1281 Formatter *formatter = flusher.get_formatter();
1285 formatter->open_object_section("policy");
1286 policy.dump(formatter);
1287 formatter->close_section();
1294 int RGWBucketAdminOp::dump_s3_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1297 RGWAccessControlPolicy_S3 policy(store->ctx());
1299 int ret = get_policy(store, op_state, policy);
1308 int RGWBucketAdminOp::unlink(RGWRados *store, RGWBucketAdminOpState& op_state)
1312 int ret = bucket.init(store, op_state);
1316 return bucket.unlink(op_state);
1319 int RGWBucketAdminOp::link(RGWRados *store, RGWBucketAdminOpState& op_state, string *err)
1323 int ret = bucket.init(store, op_state);
1327 return bucket.link(op_state, err);
1331 int RGWBucketAdminOp::check_index(RGWRados *store, RGWBucketAdminOpState& op_state,
1332 RGWFormatterFlusher& flusher)
1335 map<RGWObjCategory, RGWStorageStats> existing_stats;
1336 map<RGWObjCategory, RGWStorageStats> calculated_stats;
1341 ret = bucket.init(store, op_state);
1345 Formatter *formatter = flusher.get_formatter();
1348 ret = bucket.check_bad_index_multipart(op_state, flusher);
1352 ret = bucket.check_object_index(op_state, flusher);
1356 ret = bucket.check_index(op_state, existing_stats, calculated_stats);
1360 dump_index_check(existing_stats, calculated_stats, formatter);
1366 int RGWBucketAdminOp::remove_bucket(RGWRados *store, RGWBucketAdminOpState& op_state,
1367 bool bypass_gc, bool keep_index_consistent)
1371 int ret = bucket.init(store, op_state);
1375 std::string err_msg;
1376 ret = bucket.remove(op_state, bypass_gc, keep_index_consistent, &err_msg);
1377 if (!err_msg.empty()) {
1378 lderr(store->ctx()) << "ERROR: " << err_msg << dendl;
1383 int RGWBucketAdminOp::remove_object(RGWRados *store, RGWBucketAdminOpState& op_state)
1387 int ret = bucket.init(store, op_state);
1391 return bucket.remove_object(op_state);
1394 static int bucket_stats(RGWRados *store, const std::string& tenant_name, std::string& bucket_name, Formatter *formatter)
1396 RGWBucketInfo bucket_info;
1397 map<RGWObjCategory, RGWStorageStats> stats;
1400 RGWObjectCtx obj_ctx(store);
1401 int r = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, &mtime);
1405 rgw_bucket& bucket = bucket_info.bucket;
1407 string bucket_ver, master_ver;
1409 int ret = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, &max_marker);
1411 cerr << "error getting bucket stats ret=" << ret << std::endl;
1417 formatter->open_object_section("stats");
1418 formatter->dump_string("bucket", bucket.name);
1419 formatter->dump_string("zonegroup", bucket_info.zonegroup);
1420 formatter->dump_string("placement_rule", bucket_info.placement_rule);
1421 ::encode_json("explicit_placement", bucket.explicit_placement, formatter);
1422 formatter->dump_string("id", bucket.bucket_id);
1423 formatter->dump_string("marker", bucket.marker);
1424 formatter->dump_stream("index_type") << bucket_info.index_type;
1425 ::encode_json("owner", bucket_info.owner, formatter);
1426 formatter->dump_string("ver", bucket_ver);
1427 formatter->dump_string("master_ver", master_ver);
1428 formatter->dump_stream("mtime") << ut;
1429 formatter->dump_string("max_marker", max_marker);
1430 dump_bucket_usage(stats, formatter);
1431 encode_json("bucket_quota", bucket_info.quota, formatter);
1432 formatter->close_section();
1437 int RGWBucketAdminOp::limit_check(RGWRados *store,
1438 RGWBucketAdminOpState& op_state,
1439 const std::list<std::string>& user_ids,
1440 RGWFormatterFlusher& flusher,
1444 const size_t max_entries =
1445 store->ctx()->_conf->rgw_list_buckets_max_chunk;
1447 const size_t safe_max_objs_per_shard =
1448 store->ctx()->_conf->rgw_safe_max_objects_per_shard;
1450 uint16_t shard_warn_pct =
1451 store->ctx()->_conf->rgw_shard_warning_threshold;
1452 if (shard_warn_pct > 100)
1453 shard_warn_pct = 90;
1455 Formatter *formatter = flusher.get_formatter();
1458 formatter->open_array_section("users");
1460 for (const auto& user_id : user_ids) {
1461 formatter->open_object_section("user");
1462 formatter->dump_string("user_id", user_id);
1464 formatter->open_array_section("buckets");
1466 RGWUserBuckets buckets;
1470 ret = rgw_read_user_buckets(store, user_id, buckets,
1471 marker, string(), max_entries, false,
1476 map<string, RGWBucketEnt>& m_buckets = buckets.get_buckets();
1478 for (const auto& iter : m_buckets) {
1479 auto& bucket = iter.second.bucket;
1480 uint32_t num_shards = 1;
1481 uint64_t num_objects = 0;
1483 /* need info for num_shards */
1485 RGWObjectCtx obj_ctx(store);
1487 marker = bucket.name; /* Casey's location for marker update,
1488 * as we may now not reach the end of
1491 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name,
1496 /* need stats for num_entries */
1497 string bucket_ver, master_ver;
1498 std::map<RGWObjCategory, RGWStorageStats> stats;
1499 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver,
1500 &master_ver, stats, nullptr);
1505 for (const auto& s : stats) {
1506 num_objects += s.second.num_objects;
1509 num_shards = info.num_shards;
1510 uint64_t objs_per_shard =
1511 (num_shards) ? num_objects/num_shards : num_objects;
1515 if (objs_per_shard > safe_max_objs_per_shard) {
1517 100 - (safe_max_objs_per_shard/objs_per_shard * 100);
1518 ss << boost::format("OVER %4f%%") % over;
1522 objs_per_shard / safe_max_objs_per_shard * 100;
1523 if (fill_pct >= shard_warn_pct) {
1524 ss << boost::format("WARN %4f%%") % fill_pct;
1531 if (warn || (! warnings_only)) {
1532 formatter->open_object_section("bucket");
1533 formatter->dump_string("bucket", bucket.name);
1534 formatter->dump_string("tenant", bucket.tenant);
1535 formatter->dump_int("num_objects", num_objects);
1536 formatter->dump_int("num_shards", num_shards);
1537 formatter->dump_int("objects_per_shard", objs_per_shard);
1538 formatter->dump_string("fill_status", ss.str());
1539 formatter->close_section();
1544 done = (m_buckets.size() < max_entries);
1545 } while (!done); /* foreach: bucket */
1547 formatter->close_section();
1548 formatter->close_section();
1549 formatter->flush(cout);
1551 } /* foreach: user_id */
1553 formatter->close_section();
1554 formatter->flush(cout);
1557 } /* RGWBucketAdminOp::limit_check */
1559 int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state,
1560 RGWFormatterFlusher& flusher)
1565 string bucket_name = op_state.get_bucket_name();
1567 if (!bucket_name.empty()) {
1568 ret = bucket.init(store, op_state);
1573 Formatter *formatter = flusher.get_formatter();
1576 CephContext *cct = store->ctx();
1578 const size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
1580 bool show_stats = op_state.will_fetch_stats();
1581 rgw_user user_id = op_state.get_user_id();
1582 if (op_state.is_user_op()) {
1583 formatter->open_array_section("buckets");
1585 RGWUserBuckets buckets;
1587 bool is_truncated = false;
1590 ret = rgw_read_user_buckets(store, op_state.get_user_id(), buckets,
1591 marker, string(), max_entries, false,
1596 map<string, RGWBucketEnt>& m = buckets.get_buckets();
1597 map<string, RGWBucketEnt>::iterator iter;
1599 for (iter = m.begin(); iter != m.end(); ++iter) {
1600 std::string obj_name = iter->first;
1602 bucket_stats(store, user_id.tenant, obj_name, formatter);
1604 formatter->dump_string("bucket", obj_name);
1610 } while (is_truncated);
1612 formatter->close_section();
1613 } else if (!bucket_name.empty()) {
1614 bucket_stats(store, user_id.tenant, bucket_name, formatter);
1616 RGWAccessHandle handle;
1618 formatter->open_array_section("buckets");
1619 if (store->list_buckets_init(&handle) >= 0) {
1620 rgw_bucket_dir_entry obj;
1621 while (store->list_buckets_next(obj, &handle) >= 0) {
1623 bucket_stats(store, user_id.tenant, obj.key.name, formatter);
1625 formatter->dump_string("bucket", obj.key.name);
1629 formatter->close_section();
1638 void rgw_data_change::dump(Formatter *f) const
1641 switch (entity_type) {
1642 case ENTITY_TYPE_BUCKET:
1648 encode_json("entity_type", type, f);
1649 encode_json("key", key, f);
1650 utime_t ut(timestamp);
1651 encode_json("timestamp", ut, f);
1654 void rgw_data_change::decode_json(JSONObj *obj) {
1656 JSONDecoder::decode_json("entity_type", s, obj);
1657 if (s == "bucket") {
1658 entity_type = ENTITY_TYPE_BUCKET;
1660 entity_type = ENTITY_TYPE_UNKNOWN;
1662 JSONDecoder::decode_json("key", key, obj);
1664 JSONDecoder::decode_json("timestamp", ut, obj);
1665 timestamp = ut.to_real_time();
1668 void rgw_data_change_log_entry::dump(Formatter *f) const
1670 encode_json("log_id", log_id, f);
1671 utime_t ut(log_timestamp);
1672 encode_json("log_timestamp", ut, f);
1673 encode_json("entry", entry, f);
1676 void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
1677 JSONDecoder::decode_json("log_id", log_id, obj);
1679 JSONDecoder::decode_json("log_timestamp", ut, obj);
1680 log_timestamp = ut.to_real_time();
1681 JSONDecoder::decode_json("entry", entry, obj);
1684 int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
1685 const string& name = bs.bucket.name;
1686 int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
1687 uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
1692 int RGWDataChangesLog::renew_entries()
1694 if (!store->need_to_log_data())
1697 /* we can't keep the bucket name as part of the cls_log_entry, and we need
1698 * it later, so we keep two lists under the map */
1699 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
1702 map<rgw_bucket_shard, bool> entries;
1703 entries.swap(cur_cycle);
1706 map<rgw_bucket_shard, bool>::iterator iter;
1708 real_time ut = real_clock::now();
1709 for (iter = entries.begin(); iter != entries.end(); ++iter) {
1710 const rgw_bucket_shard& bs = iter->first;
1712 int index = choose_oid(bs);
1714 cls_log_entry entry;
1716 rgw_data_change change;
1718 change.entity_type = ENTITY_TYPE_BUCKET;
1719 change.key = bs.get_key();
1720 change.timestamp = ut;
1721 ::encode(change, bl);
1723 store->time_log_prepare_entry(entry, ut, section, change.key, bl);
1725 m[index].first.push_back(bs);
1726 m[index].second.emplace_back(std::move(entry));
1729 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
1730 for (miter = m.begin(); miter != m.end(); ++miter) {
1731 list<cls_log_entry>& entries = miter->second.second;
1733 real_time now = real_clock::now();
1735 int ret = store->time_log_add(oids[miter->first], entries, NULL);
1737 /* we don't really need to have a special handling for failed cases here,
1738 * as this is just an optimization. */
1739 lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl;
1743 real_time expiration = now;
1744 expiration += make_timespan(cct->_conf->rgw_data_log_window);
1746 list<rgw_bucket_shard>& buckets = miter->second.first;
1747 list<rgw_bucket_shard>::iterator liter;
1748 for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
1749 update_renewed(*liter, expiration);
1756 void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
1758 assert(lock.is_locked());
1759 if (!changes.find(bs, status)) {
1760 status = ChangeStatusPtr(new ChangeStatus);
1761 changes.add(bs, status);
1765 void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
1767 Mutex::Locker l(lock);
1768 cur_cycle[bs] = true;
1771 void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration)
1773 Mutex::Locker l(lock);
1774 ChangeStatusPtr status;
1775 _get_change(bs, status);
1777 ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
1778 status->cur_expiration = expiration;
1781 int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
1782 rgw_bucket_shard bs(bucket, shard_id);
1784 return choose_oid(bs);
1787 int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
1788 if (!store->need_to_log_data())
1791 rgw_bucket_shard bs(bucket, shard_id);
1793 int index = choose_oid(bs);
1794 mark_modified(index, bs);
1798 ChangeStatusPtr status;
1799 _get_change(bs, status);
1803 real_time now = real_clock::now();
1805 status->lock->Lock();
1807 ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
1809 if (now < status->cur_expiration) {
1810 /* no need to send, recently completed */
1811 status->lock->Unlock();
1817 RefCountedCond *cond;
1819 if (status->pending) {
1820 cond = status->cond;
1824 status->cond->get();
1825 status->lock->Unlock();
1827 int ret = cond->wait();
1835 status->cond = new RefCountedCond;
1836 status->pending = true;
1838 string& oid = oids[index];
1839 real_time expiration;
1844 status->cur_sent = now;
1847 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
1849 status->lock->Unlock();
1852 rgw_data_change change;
1853 change.entity_type = ENTITY_TYPE_BUCKET;
1854 change.key = bs.get_key();
1855 change.timestamp = now;
1856 ::encode(change, bl);
1859 ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
1861 ret = store->time_log_add(oid, now, section, change.key, bl);
1863 now = real_clock::now();
1865 status->lock->Lock();
1867 } while (!ret && real_clock::now() > expiration);
1869 cond = status->cond;
1871 status->pending = false;
1872 status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
1873 status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
1874 status->cond = NULL;
1875 status->lock->Unlock();
1883 int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
1884 list<rgw_data_change_log_entry>& entries,
1885 const string& marker,
1888 if (shard >= num_shards)
1891 list<cls_log_entry> log_entries;
1893 int ret = store->time_log_list(oids[shard], start_time, end_time,
1894 max_entries, log_entries, marker,
1895 out_marker, truncated);
1899 list<cls_log_entry>::iterator iter;
1900 for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
1901 rgw_data_change_log_entry log_entry;
1902 log_entry.log_id = iter->id;
1903 real_time rt = iter->timestamp.to_real_time();
1904 log_entry.log_timestamp = rt;
1905 bufferlist::iterator liter = iter->data.begin();
1907 ::decode(log_entry.entry, liter);
1908 } catch (buffer::error& err) {
1909 lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
1912 entries.push_back(log_entry);
1918 int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
1919 list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated) {
1923 for (; marker.shard < num_shards && (int)entries.size() < max_entries;
1924 marker.shard++, marker.marker.clear()) {
1925 int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
1926 marker.marker, NULL, &truncated);
1927 if (ret == -ENOENT) {
1939 *ptruncated = (marker.shard < num_shards);
1944 int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
1946 if (shard_id >= num_shards)
1949 string oid = oids[shard_id];
1951 cls_log_header header;
1953 int ret = store->time_log_info(oid, &header);
1954 if ((ret < 0) && (ret != -ENOENT))
1957 info->marker = header.max_marker;
1958 info->last_update = header.max_time.to_real_time();
1963 int RGWDataChangesLog::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
1964 const string& start_marker, const string& end_marker)
1968 if (shard_id > num_shards)
1971 ret = store->time_log_trim(oids[shard_id], start_time, end_time, start_marker, end_marker);
1973 if (ret == -ENOENT || ret == -ENODATA)
1979 int RGWDataChangesLog::trim_entries(const real_time& start_time, const real_time& end_time,
1980 const string& start_marker, const string& end_marker)
1982 for (int shard = 0; shard < num_shards; shard++) {
1983 int ret = store->time_log_trim(oids[shard], start_time, end_time, start_marker, end_marker);
1984 if (ret == -ENOENT || ret == -ENODATA) {
1994 bool RGWDataChangesLog::going_down()
1999 RGWDataChangesLog::~RGWDataChangesLog() {
2001 renew_thread->stop();
2002 renew_thread->join();
2003 delete renew_thread;
2007 void *RGWDataChangesLog::ChangesRenewThread::entry() {
2009 dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
2010 int r = log->renew_entries();
2012 dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
2015 if (log->going_down())
2018 int interval = cct->_conf->rgw_data_log_window * 3 / 4;
2020 cond.WaitInterval(lock, utime_t(interval, 0));
2022 } while (!log->going_down());
2027 void RGWDataChangesLog::ChangesRenewThread::stop()
2029 Mutex::Locker l(lock);
2033 void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
2035 auto key = bs.get_key();
2036 modified_lock.get_read();
2037 map<int, set<string> >::iterator iter = modified_shards.find(shard_id);
2038 if (iter != modified_shards.end()) {
2039 set<string>& keys = iter->second;
2040 if (keys.find(key) != keys.end()) {
2041 modified_lock.unlock();
2045 modified_lock.unlock();
2047 RWLock::WLocker wl(modified_lock);
2048 modified_shards[shard_id].insert(key);
2051 void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
2053 RWLock::WLocker wl(modified_lock);
2054 modified.swap(modified_shards);
2055 modified_shards.clear();
2058 void RGWBucketCompleteInfo::dump(Formatter *f) const {
2059 encode_json("bucket_info", info, f);
2060 encode_json("attrs", attrs, f);
2063 void RGWBucketCompleteInfo::decode_json(JSONObj *obj) {
2064 JSONDecoder::decode_json("bucket_info", info, obj);
2065 JSONDecoder::decode_json("attrs", attrs, obj);
2068 class RGWBucketMetadataHandler : public RGWMetadataHandler {
2071 string get_type() override { return "bucket"; }
2073 int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override {
2074 RGWObjVersionTracker ot;
2075 RGWBucketEntryPoint be;
2078 map<string, bufferlist> attrs;
2079 RGWObjectCtx obj_ctx(store);
2081 string tenant_name, bucket_name;
2082 parse_bucket(entry, &tenant_name, &bucket_name);
2083 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &ot, &mtime, &attrs);
2087 RGWBucketEntryMetadataObject *mdo = new RGWBucketEntryMetadataObject(be, ot.read_version, mtime);
2094 int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2095 real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2096 RGWBucketEntryPoint be, old_be;
2098 decode_json_obj(be, obj);
2099 } catch (JSONDecoder::err& e) {
2103 real_time orig_mtime;
2104 map<string, bufferlist> attrs;
2106 RGWObjVersionTracker old_ot;
2107 RGWObjectCtx obj_ctx(store);
2109 string tenant_name, bucket_name;
2110 parse_bucket(entry, &tenant_name, &bucket_name);
2111 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, old_be, &old_ot, &orig_mtime, &attrs);
2112 if (ret < 0 && ret != -ENOENT)
2115 // are we actually going to perform this put, or is it too old?
2116 if (ret != -ENOENT &&
2117 !check_versions(old_ot.read_version, orig_mtime,
2118 objv_tracker.write_version, mtime, sync_type)) {
2119 return STATUS_NO_APPLY;
2122 objv_tracker.read_version = old_ot.read_version; /* maintain the obj version we just read */
2124 ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, be, false, objv_tracker, mtime, &attrs);
2130 ret = rgw_link_bucket(store, be.owner, be.bucket, be.creation_time, false);
2132 ret = rgw_unlink_bucket(store, be.owner, be.bucket.tenant,
2133 be.bucket.name, false);
2139 struct list_keys_info {
2141 RGWListRawObjsCtx ctx;
2144 int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
2145 RGWBucketEntryPoint be;
2146 RGWObjectCtx obj_ctx(store);
2148 string tenant_name, bucket_name;
2149 parse_bucket(entry, &tenant_name, &bucket_name);
2150 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &objv_tracker, NULL, NULL);
2155 * We're unlinking the bucket but we don't want to update the entrypoint here - we're removing
2156 * it immediately and don't want to invalidate our cached objv_version or the bucket obj removal
2157 * will incorrectly fail.
2159 ret = rgw_unlink_bucket(store, be.owner, tenant_name, bucket_name, false);
2161 lderr(store->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
2164 ret = rgw_bucket_delete_bucket_obj(store, tenant_name, bucket_name, objv_tracker);
2166 lderr(store->ctx()) << "could not delete bucket=" << entry << dendl;
2172 void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2174 pool = store->get_zone_params().domain_root;
2177 int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
2178 auto info = ceph::make_unique<list_keys_info>();
2180 info->store = store;
2182 int ret = store->list_raw_objects_init(store->get_zone_params().domain_root, marker,
2187 *phandle = (void *)info.release();
2192 int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
2193 list_keys_info *info = static_cast<list_keys_info *>(handle);
2199 RGWRados *store = info->store;
2201 list<string> unfiltered_keys;
2203 int ret = store->list_raw_objects_next(no_filter, max, info->ctx,
2204 unfiltered_keys, truncated);
2205 if (ret < 0 && ret != -ENOENT)
2207 if (ret == -ENOENT) {
2213 // now filter out the system entries
2214 list<string>::iterator iter;
2215 for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
2226 void list_keys_complete(void *handle) override {
2227 list_keys_info *info = static_cast<list_keys_info *>(handle);
2231 string get_marker(void *handle) {
2232 list_keys_info *info = static_cast<list_keys_info *>(handle);
2233 return info->store->list_raw_objs_get_cursor(info->ctx);
2237 class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler {
2240 string get_type() override { return "bucket.instance"; }
2242 int get(RGWRados *store, string& oid, RGWMetadataObject **obj) override {
2243 RGWBucketCompleteInfo bci;
2246 RGWObjectCtx obj_ctx(store);
2248 int ret = store->get_bucket_instance_info(obj_ctx, oid, bci.info, &mtime, &bci.attrs);
2252 RGWBucketInstanceMetadataObject *mdo = new RGWBucketInstanceMetadataObject(bci, bci.info.objv_tracker.read_version, mtime);
2259 int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2260 real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2261 RGWBucketCompleteInfo bci, old_bci;
2263 decode_json_obj(bci, obj);
2264 } catch (JSONDecoder::err& e) {
2268 real_time orig_mtime;
2269 RGWObjectCtx obj_ctx(store);
2271 int ret = store->get_bucket_instance_info(obj_ctx, entry, old_bci.info,
2272 &orig_mtime, &old_bci.attrs);
2273 bool exists = (ret != -ENOENT);
2274 if (ret < 0 && exists)
2277 if (!exists || old_bci.info.bucket.bucket_id != bci.info.bucket.bucket_id) {
2278 /* a new bucket, we need to select a new bucket placement for it */
2280 rgw_bucket_instance_oid_to_key(key);
2283 string bucket_instance;
2284 parse_bucket(key, &tenant_name, &bucket_name, &bucket_instance);
2286 RGWZonePlacementInfo rule_info;
2287 bci.info.bucket.name = bucket_name;
2288 bci.info.bucket.bucket_id = bucket_instance;
2289 bci.info.bucket.tenant = tenant_name;
2290 ret = store->select_bucket_location_by_rule(bci.info.placement_rule, &rule_info);
2292 ldout(store->ctx(), 0) << "ERROR: select_bucket_placement() returned " << ret << dendl;
2295 bci.info.index_type = rule_info.index_type;
2297 /* existing bucket, keep its placement */
2298 bci.info.bucket.explicit_placement = old_bci.info.bucket.explicit_placement;
2299 bci.info.placement_rule = old_bci.info.placement_rule;
2302 if (exists && old_bci.info.datasync_flag_enabled() != bci.info.datasync_flag_enabled()) {
2303 int shards_num = bci.info.num_shards? bci.info.num_shards : 1;
2304 int shard_id = bci.info.num_shards? 0 : -1;
2306 if (!bci.info.datasync_flag_enabled()) {
2307 ret = store->stop_bi_log_entries(bci.info, -1);
2309 lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2313 ret = store->resync_bi_log_entries(bci.info, -1);
2315 lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2320 for (int i = 0; i < shards_num; ++i, ++shard_id) {
2321 ret = store->data_log->add_entry(bci.info.bucket, shard_id);
2323 lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
2329 // are we actually going to perform this put, or is it too old?
2331 !check_versions(old_bci.info.objv_tracker.read_version, orig_mtime,
2332 objv_tracker.write_version, mtime, sync_type)) {
2333 objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2334 return STATUS_NO_APPLY;
2337 /* record the read version (if any), store the new version */
2338 bci.info.objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2339 bci.info.objv_tracker.write_version = objv_tracker.write_version;
2341 ret = store->put_bucket_instance_info(bci.info, false, mtime, &bci.attrs);
2345 objv_tracker = bci.info.objv_tracker;
2347 ret = store->init_bucket_index(bci.info, bci.info.num_shards);
2351 return STATUS_APPLIED;
2354 struct list_keys_info {
2356 RGWListRawObjsCtx ctx;
2359 int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
2361 RGWObjectCtx obj_ctx(store);
2363 int ret = store->get_bucket_instance_info(obj_ctx, entry, info, NULL, NULL);
2364 if (ret < 0 && ret != -ENOENT)
2367 return rgw_bucket_instance_remove_entry(store, entry, &info.objv_tracker);
2370 void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2371 oid = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
2372 rgw_bucket_instance_key_to_oid(oid);
2373 pool = store->get_zone_params().domain_root;
2376 int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
2377 auto info = ceph::make_unique<list_keys_info>();
2379 info->store = store;
2381 int ret = store->list_raw_objects_init(store->get_zone_params().domain_root, marker,
2386 *phandle = (void *)info.release();
2391 int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
2392 list_keys_info *info = static_cast<list_keys_info *>(handle);
2398 RGWRados *store = info->store;
2400 list<string> unfiltered_keys;
2402 int ret = store->list_raw_objects_next(no_filter, max, info->ctx,
2403 unfiltered_keys, truncated);
2404 if (ret < 0 && ret != -ENOENT)
2406 if (ret == -ENOENT) {
2412 constexpr int prefix_size = sizeof(RGW_BUCKET_INSTANCE_MD_PREFIX) - 1;
2413 // now filter in the relevant entries
2414 list<string>::iterator iter;
2415 for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
2418 if (k.compare(0, prefix_size, RGW_BUCKET_INSTANCE_MD_PREFIX) == 0) {
2419 auto oid = k.substr(prefix_size);
2420 rgw_bucket_instance_oid_to_key(oid);
2421 keys.emplace_back(std::move(oid));
2428 void list_keys_complete(void *handle) override {
2429 list_keys_info *info = static_cast<list_keys_info *>(handle);
2433 string get_marker(void *handle) {
2434 list_keys_info *info = static_cast<list_keys_info *>(handle);
2435 return info->store->list_raw_objs_get_cursor(info->ctx);
2439 * hash entry for mdlog placement. Use the same hash key we'd have for the bucket entry
2440 * point, so that the log entries end up at the same log shard, so that we process them
2443 void get_hash_key(const string& section, const string& key, string& hash_key) override {
2445 int pos = key.find(':');
2449 k = key.substr(0, pos);
2450 hash_key = "bucket:" + k;
2454 void rgw_bucket_init(RGWMetadataManager *mm)
2456 bucket_meta_handler = new RGWBucketMetadataHandler;
2457 mm->register_handler(bucket_meta_handler);
2458 bucket_instance_meta_handler = new RGWBucketInstanceMetadataHandler;
2459 mm->register_handler(bucket_instance_meta_handler);