X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_bucket.cc;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_bucket.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=2a38733aa2966740b3dcbc1dec48eba6e8fb56fe;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_bucket.cc b/src/ceph/src/rgw/rgw_bucket.cc deleted file mode 100644 index 2a38733..0000000 --- a/src/ceph/src/rgw/rgw_bucket.cc +++ /dev/null @@ -1,2460 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include - -#include -#include -#include - -#include -#include - -#include "common/errno.h" -#include "common/ceph_json.h" -#include "common/backport14.h" -#include "rgw_rados.h" -#include "rgw_acl.h" -#include "rgw_acl_s3.h" - -#include "include/types.h" -#include "rgw_bucket.h" -#include "rgw_user.h" -#include "rgw_string.h" -#include "rgw_multi.h" - -#include "include/rados/librados.hpp" -// until everything is moved from rgw_common -#include "rgw_common.h" - -#include "cls/user/cls_user_types.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rgw - -#define BUCKET_TAG_TIMEOUT 30 - -using namespace std; - -static RGWMetadataHandler *bucket_meta_handler = NULL; -static RGWMetadataHandler *bucket_instance_meta_handler = NULL; - -// define as static when RGWBucket implementation compete -void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id) -{ - buckets_obj_id = user_id.to_str(); - buckets_obj_id += RGW_BUCKETS_OBJ_SUFFIX; -} - -/* - * Note that this is not a reversal of parse_bucket(). That one deals - * with the syntax we need in metadata and such. This one deals with - * the representation in RADOS pools. We chose '/' because it's not - * acceptable in bucket names and thus qualified buckets cannot conflict - * with the legacy or S3 buckets. - */ -std::string rgw_make_bucket_entry_name(const std::string& tenant_name, - const std::string& bucket_name) { - std::string bucket_entry; - - if (bucket_name.empty()) { - bucket_entry.clear(); - } else if (tenant_name.empty()) { - bucket_entry = bucket_name; - } else { - bucket_entry = tenant_name + "/" + bucket_name; - } - - return bucket_entry; -} - -/* - * Tenants are separated from buckets in URLs by a colon in S3. - * This function is not to be used on Swift URLs, not even for COPY arguments. - */ -void rgw_parse_url_bucket(const string &bucket, const string& auth_tenant, - string &tenant_name, string &bucket_name) { - - int pos = bucket.find(':'); - if (pos >= 0) { - /* - * N.B.: We allow ":bucket" syntax with explicit empty tenant in order - * to refer to the legacy tenant, in case users in new named tenants - * want to access old global buckets. - */ - tenant_name = bucket.substr(0, pos); - bucket_name = bucket.substr(pos + 1); - } else { - tenant_name = auth_tenant; - bucket_name = bucket; - } -} - -/** - * Get all the buckets owned by a user and fill up an RGWUserBuckets with them. - * Returns: 0 on success, -ERR# on failure. - */ -int rgw_read_user_buckets(RGWRados * store, - const rgw_user& user_id, - RGWUserBuckets& buckets, - const string& marker, - const string& end_marker, - uint64_t max, - bool need_stats, - bool *is_truncated, - uint64_t default_amount) -{ - int ret; - buckets.clear(); - std::string buckets_obj_id; - rgw_get_buckets_obj(user_id, buckets_obj_id); - rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id); - - bool truncated = false; - string m = marker; - - uint64_t total = 0; - - if (!max) { - max = default_amount; - } - - do { - std::list entries; - ret = store->cls_user_list_buckets(obj, m, end_marker, max - total, entries, &m, &truncated); - if (ret == -ENOENT) { - ret = 0; - } - - if (ret < 0) { - return ret; - } - - for (auto& entry : entries) { - buckets.add(RGWBucketEnt(user_id, std::move(entry))); - total++; - } - - } while (truncated && total < max); - - if (is_truncated != nullptr) { - *is_truncated = truncated; - } - - if (need_stats) { - map& m = buckets.get_buckets(); - ret = store->update_containers_stats(m); - if (ret < 0 && ret != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: could not get stats for buckets" << dendl; - return ret; - } - } - return 0; -} - -int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info) -{ - string buckets_obj_id; - rgw_get_buckets_obj(user_id, buckets_obj_id); - rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id); - - return store->cls_user_sync_bucket_stats(obj, bucket_info); -} - -int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name) -{ - RGWBucketInfo bucket_info; - RGWObjectCtx obj_ctx(store); - int ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: could not fetch bucket info: ret=" << ret << dendl; - return ret; - } - - ret = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: could not sync user stats for bucket " << bucket_name << ": ret=" << ret << dendl; - return ret; - } - - return 0; -} - -int rgw_link_bucket(RGWRados* const store, - const rgw_user& user_id, - rgw_bucket& bucket, - ceph::real_time creation_time, - bool update_entrypoint) -{ - int ret; - string& tenant_name = bucket.tenant; - string& bucket_name = bucket.name; - - cls_user_bucket_entry new_bucket; - - RGWBucketEntryPoint ep; - RGWObjVersionTracker ot; - - bucket.convert(&new_bucket.bucket); - new_bucket.size = 0; - if (real_clock::is_zero(creation_time)) - new_bucket.creation_time = real_clock::now(); - else - new_bucket.creation_time = creation_time; - - map attrs; - RGWObjectCtx obj_ctx(store); - - if (update_entrypoint) { - ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs); - if (ret < 0 && ret != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: store->get_bucket_entrypoint_info() returned: " - << cpp_strerror(-ret) << dendl; - } - } - - string buckets_obj_id; - rgw_get_buckets_obj(user_id, buckets_obj_id); - - rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id); - ret = store->cls_user_add_bucket(obj, new_bucket); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: error adding bucket to directory: " - << cpp_strerror(-ret) << dendl; - goto done_err; - } - - if (!update_entrypoint) - return 0; - - ep.linked = true; - ep.owner = user_id; - ep.bucket = bucket; - ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs); - if (ret < 0) - goto done_err; - - return 0; -done_err: - int r = rgw_unlink_bucket(store, user_id, bucket.tenant, bucket.name); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: failed unlinking bucket on error cleanup: " - << cpp_strerror(-r) << dendl; - } - return ret; -} - -int rgw_unlink_bucket(RGWRados *store, const rgw_user& user_id, const string& tenant_name, const string& bucket_name, bool update_entrypoint) -{ - int ret; - - string buckets_obj_id; - rgw_get_buckets_obj(user_id, buckets_obj_id); - - cls_user_bucket bucket; - bucket.name = bucket_name; - rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id); - ret = store->cls_user_remove_bucket(obj, bucket); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: error removing bucket from directory: " - << cpp_strerror(-ret)<< dendl; - } - - if (!update_entrypoint) - return 0; - - RGWBucketEntryPoint ep; - RGWObjVersionTracker ot; - map attrs; - RGWObjectCtx obj_ctx(store); - ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs); - if (ret == -ENOENT) - return 0; - if (ret < 0) - return ret; - - if (!ep.linked) - return 0; - - if (ep.owner != user_id) { - ldout(store->ctx(), 0) << "bucket entry point user mismatch, can't unlink bucket: " << ep.owner << " != " << user_id << dendl; - return -EINVAL; - } - - ep.linked = false; - return store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs); -} - -int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive, - map *pattrs, RGWObjVersionTracker *objv_tracker, - real_time mtime) { - return store->meta_mgr->put_entry(bucket_meta_handler, bucket_name, bl, exclusive, objv_tracker, mtime, pattrs); -} - -int rgw_bucket_instance_store_info(RGWRados *store, string& entry, bufferlist& bl, bool exclusive, - map *pattrs, RGWObjVersionTracker *objv_tracker, - real_time mtime) { - return store->meta_mgr->put_entry(bucket_instance_meta_handler, entry, bl, exclusive, objv_tracker, mtime, pattrs); -} - -int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker) { - return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker); -} - -// 'tenant/' is used in bucket instance keys for sync to avoid parsing ambiguity -// with the existing instance[:shard] format. once we parse the shard, the / is -// replaced with a : to match the [tenant:]instance format -void rgw_bucket_instance_key_to_oid(string& key) -{ - // replace tenant/ with tenant: - auto c = key.find('/'); - if (c != string::npos) { - key[c] = ':'; - } -} - -// convert bucket instance oids back to the tenant/ format for metadata keys. -// it's safe to parse 'tenant:' only for oids, because they won't contain the -// optional :shard at the end -void rgw_bucket_instance_oid_to_key(string& oid) -{ - // find first : (could be tenant:bucket or bucket:instance) - auto c = oid.find(':'); - if (c != string::npos) { - // if we find another :, the first one was for tenant - if (oid.find(':', c + 1) != string::npos) { - oid[c] = '/'; - } - } -} - -int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id) -{ - ssize_t pos = bucket_instance.rfind(':'); - if (pos < 0) { - return -EINVAL; - } - - string first = bucket_instance.substr(0, pos); - string second = bucket_instance.substr(pos + 1); - - if (first.find(':') == string::npos) { - *shard_id = -1; - *target_bucket_instance = bucket_instance; - return 0; - } - - *target_bucket_instance = first; - string err; - *shard_id = strict_strtol(second.c_str(), 10, &err); - if (!err.empty()) { - return -EINVAL; - } - - return 0; -} - -// parse key in format: [tenant/]name:instance[:shard_id] -int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key, - rgw_bucket *bucket, int *shard_id) -{ - boost::string_ref name{key}; - boost::string_ref instance; - - // split tenant/name - auto pos = name.find('/'); - if (pos != boost::string_ref::npos) { - auto tenant = name.substr(0, pos); - bucket->tenant.assign(tenant.begin(), tenant.end()); - name = name.substr(pos + 1); - } - - // split name:instance - pos = name.find(':'); - if (pos != boost::string_ref::npos) { - instance = name.substr(pos + 1); - name = name.substr(0, pos); - } - bucket->name.assign(name.begin(), name.end()); - - // split instance:shard - pos = instance.find(':'); - if (pos == boost::string_ref::npos) { - bucket->bucket_id.assign(instance.begin(), instance.end()); - *shard_id = -1; - return 0; - } - - // parse shard id - auto shard = instance.substr(pos + 1); - string err; - auto id = strict_strtol(shard.data(), 10, &err); - if (!err.empty()) { - ldout(cct, 0) << "ERROR: failed to parse bucket shard '" - << instance.data() << "': " << err << dendl; - return -EINVAL; - } - - *shard_id = id; - instance = instance.substr(0, pos); - bucket->bucket_id.assign(instance.begin(), instance.end()); - return 0; -} - -int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info, - map& attrs, - RGWObjVersionTracker *objv_tracker) -{ - rgw_bucket& bucket = bucket_info.bucket; - - if (!bucket_info.has_instance_obj) { - /* an old bucket object, need to convert it */ - RGWObjectCtx obj_ctx(store); - int ret = store->convert_old_bucket_info(obj_ctx, bucket.tenant, bucket.name); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed converting old bucket info: " << ret << dendl; - return ret; - } - } - - /* we want the bucket instance name without the oid prefix cruft */ - string key = bucket.get_key(); - bufferlist bl; - - ::encode(bucket_info, bl); - - return rgw_bucket_instance_store_info(store, key, bl, false, &attrs, objv_tracker, real_time()); -} - -static void dump_mulipart_index_results(list& objs_to_unlink, - Formatter *f) -{ - for (const auto& o : objs_to_unlink) { - f->dump_string("object", o.name); - } -} - -void check_bad_user_bucket_mapping(RGWRados *store, const rgw_user& user_id, - bool fix) -{ - RGWUserBuckets user_buckets; - bool is_truncated = false; - string marker; - - CephContext *cct = store->ctx(); - - size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk; - - do { - int ret = rgw_read_user_buckets(store, user_id, user_buckets, marker, - string(), max_entries, false, - &is_truncated); - if (ret < 0) { - ldout(store->ctx(), 0) << "failed to read user buckets: " - << cpp_strerror(-ret) << dendl; - return; - } - - map& buckets = user_buckets.get_buckets(); - for (map::iterator i = buckets.begin(); - i != buckets.end(); - ++i) { - marker = i->first; - - RGWBucketEnt& bucket_ent = i->second; - rgw_bucket& bucket = bucket_ent.bucket; - - RGWBucketInfo bucket_info; - real_time mtime; - RGWObjectCtx obj_ctx(store); - int r = store->get_bucket_info(obj_ctx, user_id.tenant, bucket.name, bucket_info, &mtime); - if (r < 0) { - ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << dendl; - continue; - } - - rgw_bucket& actual_bucket = bucket_info.bucket; - - if (actual_bucket.name.compare(bucket.name) != 0 || - actual_bucket.tenant.compare(bucket.tenant) != 0 || - actual_bucket.marker.compare(bucket.marker) != 0 || - actual_bucket.bucket_id.compare(bucket.bucket_id) != 0) { - cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl; - if (fix) { - cout << "fixing" << std::endl; - r = rgw_link_bucket(store, user_id, actual_bucket, - bucket_info.creation_time); - if (r < 0) { - cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl; - } - } - } - } - } while (is_truncated); -} - -static bool bucket_object_check_filter(const string& oid) -{ - rgw_obj_key key; - string ns; - return rgw_obj_key::oid_to_key_in_ns(oid, &key, ns); -} - -int rgw_remove_object(RGWRados *store, RGWBucketInfo& bucket_info, rgw_bucket& bucket, rgw_obj_key& key) -{ - RGWObjectCtx rctx(store); - - if (key.instance.empty()) { - key.instance = "null"; - } - - rgw_obj obj(bucket, key); - - return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status()); -} - -int rgw_remove_bucket(RGWRados *store, rgw_bucket& bucket, bool delete_children) -{ - int ret; - map stats; - std::vector objs; - map common_prefixes; - RGWBucketInfo info; - RGWObjectCtx obj_ctx(store); - - string bucket_ver, master_ver; - - ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL); - if (ret < 0) - return ret; - - ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL); - if (ret < 0) - return ret; - - RGWRados::Bucket target(store, info); - RGWRados::Bucket::List list_op(&target); - CephContext *cct = store->ctx(); - int max = 1000; - - list_op.params.list_versions = true; - - do { - objs.clear(); - - ret = list_op.list_objects(max, &objs, &common_prefixes, NULL); - if (ret < 0) - return ret; - - if (!objs.empty() && !delete_children) { - lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << bucket.name << dendl; - return -ENOTEMPTY; - } - - for (const auto& obj : objs) { - rgw_obj_key key(obj.key); - ret = rgw_remove_object(store, info, bucket, key); - if (ret < 0) - return ret; - } - - } while (!objs.empty()); - - string prefix, delimiter; - - ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter); - if (ret < 0) { - return ret; - } - - ret = rgw_bucket_sync_user_stats(store, bucket.tenant, info); - if ( ret < 0) { - dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl; - } - - RGWObjVersionTracker objv_tracker; - - ret = store->delete_bucket(info, objv_tracker); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << dendl; - return ret; - } - - ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl; - } - - return ret; -} - -static int aio_wait(librados::AioCompletion *handle) -{ - librados::AioCompletion *c = (librados::AioCompletion *)handle; - c->wait_for_safe(); - int ret = c->get_return_value(); - c->release(); - return ret; -} - -static int drain_handles(list& pending) -{ - int ret = 0; - while (!pending.empty()) { - librados::AioCompletion *handle = pending.front(); - pending.pop_front(); - int r = aio_wait(handle); - if (r < 0) { - ret = r; - } - } - return ret; -} - -int rgw_remove_bucket_bypass_gc(RGWRados *store, rgw_bucket& bucket, - int concurrent_max, bool keep_index_consistent) -{ - int ret; - map stats; - std::vector objs; - map common_prefixes; - RGWBucketInfo info; - RGWObjectCtx obj_ctx(store); - CephContext *cct = store->ctx(); - - string bucket_ver, master_ver; - - ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL); - if (ret < 0) - return ret; - - ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL); - if (ret < 0) - return ret; - - string prefix, delimiter; - - ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter); - if (ret < 0) { - return ret; - } - - RGWRados::Bucket target(store, info); - RGWRados::Bucket::List list_op(&target); - - list_op.params.list_versions = true; - - std::list handles; - - int max = 1000; - int max_aio = concurrent_max; - ret = list_op.list_objects(max, &objs, &common_prefixes, NULL); - if (ret < 0) - return ret; - - while (!objs.empty()) { - std::vector::iterator it = objs.begin(); - for (; it != objs.end(); ++it) { - RGWObjState *astate = NULL; - rgw_obj obj(bucket, (*it).key); - - ret = store->get_obj_state(&obj_ctx, info, obj, &astate, false); - if (ret == -ENOENT) { - dout(1) << "WARNING: cannot find obj state for obj " << obj.get_oid() << dendl; - continue; - } - if (ret < 0) { - lderr(store->ctx()) << "ERROR: get obj state returned with error " << ret << dendl; - return ret; - } - - if (astate->has_manifest) { - RGWObjManifest& manifest = astate->manifest; - RGWObjManifest::obj_iterator miter = manifest.obj_begin(); - rgw_obj head_obj = manifest.get_obj(); - rgw_raw_obj raw_head_obj; - store->obj_to_raw(info.placement_rule, head_obj, &raw_head_obj); - - - for (; miter != manifest.obj_end() && max_aio--; ++miter) { - if (!max_aio) { - ret = drain_handles(handles); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl; - return ret; - } - max_aio = concurrent_max; - } - - rgw_raw_obj last_obj = miter.get_location().get_raw_obj(store); - if (last_obj == raw_head_obj) { - // have the head obj deleted at the end - continue; - } - - ret = store->delete_raw_obj_aio(last_obj, handles); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl; - return ret; - } - } // for all shadow objs - - ret = store->delete_obj_aio(head_obj, info, astate, handles, keep_index_consistent); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl; - return ret; - } - } - - if (!max_aio) { - ret = drain_handles(handles); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl; - return ret; - } - max_aio = concurrent_max; - } - } // for all RGW objects - objs.clear(); - - ret = list_op.list_objects(max, &objs, &common_prefixes, NULL); - if (ret < 0) - return ret; - } - - ret = drain_handles(handles); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl; - return ret; - } - - ret = rgw_bucket_sync_user_stats(store, bucket.tenant, info); - if (ret < 0) { - dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl; - } - - RGWObjVersionTracker objv_tracker; - - ret = rgw_bucket_delete_bucket_obj(store, bucket.tenant, bucket.name, objv_tracker); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << "with ret as " << ret << dendl; - return ret; - } - - if (!store->is_syncing_bucket_meta(bucket)) { - RGWObjVersionTracker objv_tracker; - string entry = bucket.get_key(); - ret = rgw_bucket_instance_remove_entry(store, entry, &objv_tracker); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: could not remove bucket instance entry" << bucket.name << "with ret as " << ret << dendl; - return ret; - } - } - - ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl; - } - - return ret; -} - -int rgw_bucket_delete_bucket_obj(RGWRados *store, - const string& tenant_name, - const string& bucket_name, - RGWObjVersionTracker& objv_tracker) -{ - string key; - - rgw_make_bucket_entry_name(tenant_name, bucket_name, key); - return store->meta_mgr->remove_entry(bucket_meta_handler, key, &objv_tracker); -} - -static void set_err_msg(std::string *sink, std::string msg) -{ - if (sink && !msg.empty()) - *sink = msg; -} - -int RGWBucket::init(RGWRados *storage, RGWBucketAdminOpState& op_state) -{ - if (!storage) - return -EINVAL; - - store = storage; - - rgw_user user_id = op_state.get_user_id(); - tenant = user_id.tenant; - bucket_name = op_state.get_bucket_name(); - RGWUserBuckets user_buckets; - RGWObjectCtx obj_ctx(store); - - if (bucket_name.empty() && user_id.empty()) - return -EINVAL; - - if (!bucket_name.empty()) { - int r = store->get_bucket_info(obj_ctx, tenant, bucket_name, bucket_info, NULL); - if (r < 0) { - ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket_name << dendl; - return r; - } - - op_state.set_bucket(bucket_info.bucket); - } - - if (!user_id.empty()) { - int r = rgw_get_user_info_by_uid(store, user_id, user_info); - if (r < 0) - return r; - - op_state.display_name = user_info.display_name; - } - - clear_failure(); - return 0; -} - -int RGWBucket::link(RGWBucketAdminOpState& op_state, std::string *err_msg) -{ - if (!op_state.is_user_op()) { - set_err_msg(err_msg, "empty user id"); - return -EINVAL; - } - - string bucket_id = op_state.get_bucket_id(); - if (bucket_id.empty()) { - set_err_msg(err_msg, "empty bucket instance id"); - return -EINVAL; - } - - std::string display_name = op_state.get_user_display_name(); - rgw_bucket bucket = op_state.get_bucket(); - - const rgw_pool& root_pool = store->get_zone_params().domain_root; - rgw_raw_obj obj(root_pool, bucket.name); - RGWObjVersionTracker objv_tracker; - - map attrs; - RGWBucketInfo bucket_info; - - string key = bucket.name + ":" + bucket_id; - RGWObjectCtx obj_ctx(store); - int r = store->get_bucket_instance_info(obj_ctx, key, bucket_info, NULL, &attrs); - if (r < 0) { - return r; - } - - rgw_user user_id = op_state.get_user_id(); - - map::iterator aiter = attrs.find(RGW_ATTR_ACL); - if (aiter != attrs.end()) { - bufferlist aclbl = aiter->second; - RGWAccessControlPolicy policy; - ACLOwner owner; - try { - bufferlist::iterator iter = aclbl.begin(); - ::decode(policy, iter); - owner = policy.get_owner(); - } catch (buffer::error& err) { - set_err_msg(err_msg, "couldn't decode policy"); - return -EIO; - } - - r = rgw_unlink_bucket(store, owner.get_id(), bucket.tenant, bucket.name, false); - if (r < 0) { - set_err_msg(err_msg, "could not unlink policy from user " + owner.get_id().to_str()); - return r; - } - - // now update the user for the bucket... - if (display_name.empty()) { - ldout(store->ctx(), 0) << "WARNING: user " << user_info.user_id << " has no display name set" << dendl; - } - policy.create_default(user_info.user_id, display_name); - - owner = policy.get_owner(); - r = store->set_bucket_owner(bucket_info.bucket, owner); - if (r < 0) { - set_err_msg(err_msg, "failed to set bucket owner: " + cpp_strerror(-r)); - return r; - } - - // ...and encode the acl - aclbl.clear(); - policy.encode(aclbl); - - r = store->system_obj_set_attr(NULL, obj, RGW_ATTR_ACL, aclbl, &objv_tracker); - if (r < 0) { - return r; - } - - RGWAccessControlPolicy policy_instance; - policy_instance.create_default(user_info.user_id, display_name); - aclbl.clear(); - policy_instance.encode(aclbl); - - string oid_bucket_instance = RGW_BUCKET_INSTANCE_MD_PREFIX + key; - rgw_raw_obj obj_bucket_instance(root_pool, oid_bucket_instance); - r = store->system_obj_set_attr(NULL, obj_bucket_instance, RGW_ATTR_ACL, aclbl, &objv_tracker); - if (r < 0) { - return r; - } - - r = rgw_link_bucket(store, user_info.user_id, bucket_info.bucket, - ceph::real_time()); - if (r < 0) { - return r; - } - } - - return 0; -} - -int RGWBucket::unlink(RGWBucketAdminOpState& op_state, std::string *err_msg) -{ - rgw_bucket bucket = op_state.get_bucket(); - - if (!op_state.is_user_op()) { - set_err_msg(err_msg, "could not fetch user or user bucket info"); - return -EINVAL; - } - - int r = rgw_unlink_bucket(store, user_info.user_id, bucket.tenant, bucket.name); - if (r < 0) { - set_err_msg(err_msg, "error unlinking bucket" + cpp_strerror(-r)); - } - - return r; -} - -int RGWBucket::remove(RGWBucketAdminOpState& op_state, bool bypass_gc, - bool keep_index_consistent, std::string *err_msg) -{ - bool delete_children = op_state.will_delete_children(); - rgw_bucket bucket = op_state.get_bucket(); - int ret; - - if (bypass_gc) { - if (delete_children) { - ret = rgw_remove_bucket_bypass_gc(store, bucket, op_state.get_max_aio(), keep_index_consistent); - } else { - set_err_msg(err_msg, "purge objects should be set for gc to be bypassed"); - return -EINVAL; - } - } else { - ret = rgw_remove_bucket(store, bucket, delete_children); - } - - if (ret < 0) { - set_err_msg(err_msg, "unable to remove bucket" + cpp_strerror(-ret)); - return ret; - } - - return 0; -} - -int RGWBucket::remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg) -{ - rgw_bucket bucket = op_state.get_bucket(); - std::string object_name = op_state.get_object_name(); - - rgw_obj_key key(object_name); - - int ret = rgw_remove_object(store, bucket_info, bucket, key); - if (ret < 0) { - set_err_msg(err_msg, "unable to remove object" + cpp_strerror(-ret)); - return ret; - } - - return 0; -} - -static void dump_bucket_index(map result, Formatter *f) -{ - map::iterator iter; - for (iter = result.begin(); iter != result.end(); ++iter) { - f->dump_string("object", iter->first); - } -} - -static void dump_bucket_usage(map& stats, Formatter *formatter) -{ - map::iterator iter; - - formatter->open_object_section("usage"); - for (iter = stats.begin(); iter != stats.end(); ++iter) { - RGWStorageStats& s = iter->second; - const char *cat_name = rgw_obj_category_name(iter->first); - formatter->open_object_section(cat_name); - s.dump(formatter); - formatter->close_section(); - } - formatter->close_section(); -} - -static void dump_index_check(map existing_stats, - map calculated_stats, - Formatter *formatter) -{ - formatter->open_object_section("check_result"); - formatter->open_object_section("existing_header"); - dump_bucket_usage(existing_stats, formatter); - formatter->close_section(); - formatter->open_object_section("calculated_header"); - dump_bucket_usage(calculated_stats, formatter); - formatter->close_section(); - formatter->close_section(); -} - -int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state, - RGWFormatterFlusher& flusher ,std::string *err_msg) -{ - bool fix_index = op_state.will_fix_index(); - rgw_bucket bucket = op_state.get_bucket(); - - size_t max = 1000; - - map common_prefixes; - - bool is_truncated; - map meta_objs; - map all_objs; - - RGWBucketInfo bucket_info; - RGWObjectCtx obj_ctx(store); - int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): get_bucket_instance_info(bucket=" << bucket << ") returned r=" << r << dendl; - return r; - } - - RGWRados::Bucket target(store, bucket_info); - RGWRados::Bucket::List list_op(&target); - - list_op.params.list_versions = true; - list_op.params.ns = RGW_OBJ_NS_MULTIPART; - - do { - vector result; - int r = list_op.list_objects(max, &result, &common_prefixes, &is_truncated); - if (r < 0) { - set_err_msg(err_msg, "failed to list objects in bucket=" + bucket.name + - " err=" + cpp_strerror(-r)); - - return r; - } - - vector::iterator iter; - for (iter = result.begin(); iter != result.end(); ++iter) { - rgw_obj_index_key key = iter->key; - rgw_obj obj(bucket, key); - string oid = obj.get_oid(); - - int pos = oid.find_last_of('.'); - if (pos < 0) { - /* obj has no suffix */ - all_objs[key] = oid; - } else { - /* obj has suffix */ - string name = oid.substr(0, pos); - string suffix = oid.substr(pos + 1); - - if (suffix.compare("meta") == 0) { - meta_objs[name] = true; - } else { - all_objs[key] = name; - } - } - } - - } while (is_truncated); - - list objs_to_unlink; - Formatter *f = flusher.get_formatter(); - - f->open_array_section("invalid_multipart_entries"); - - for (auto aiter = all_objs.begin(); aiter != all_objs.end(); ++aiter) { - string& name = aiter->second; - - if (meta_objs.find(name) == meta_objs.end()) { - objs_to_unlink.push_back(aiter->first); - } - - if (objs_to_unlink.size() > max) { - if (fix_index) { - int r = store->remove_objs_from_index(bucket_info, objs_to_unlink); - if (r < 0) { - set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " + - cpp_strerror(-r)); - return r; - } - } - - dump_mulipart_index_results(objs_to_unlink, flusher.get_formatter()); - flusher.flush(); - objs_to_unlink.clear(); - } - } - - if (fix_index) { - int r = store->remove_objs_from_index(bucket_info, objs_to_unlink); - if (r < 0) { - set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " + - cpp_strerror(-r)); - - return r; - } - } - - dump_mulipart_index_results(objs_to_unlink, f); - f->close_section(); - flusher.flush(); - - return 0; -} - -int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state, - RGWFormatterFlusher& flusher, - std::string *err_msg) -{ - - bool fix_index = op_state.will_fix_index(); - - rgw_bucket bucket = op_state.get_bucket(); - - if (!fix_index) { - set_err_msg(err_msg, "check-objects flag requires fix index enabled"); - return -EINVAL; - } - - store->cls_obj_set_bucket_tag_timeout(bucket_info, BUCKET_TAG_TIMEOUT); - - string prefix; - rgw_obj_index_key marker; - bool is_truncated = true; - - Formatter *formatter = flusher.get_formatter(); - formatter->open_object_section("objects"); - while (is_truncated) { - map result; - - int r = store->cls_bucket_list(bucket_info, RGW_NO_SHARD, marker, prefix, 1000, true, - result, &is_truncated, &marker, - bucket_object_check_filter); - if (r == -ENOENT) { - break; - } else if (r < 0 && r != -ENOENT) { - set_err_msg(err_msg, "ERROR: failed operation r=" + cpp_strerror(-r)); - } - - - dump_bucket_index(result, formatter); - flusher.flush(); - - } - - formatter->close_section(); - - store->cls_obj_set_bucket_tag_timeout(bucket_info, 0); - - return 0; -} - - -int RGWBucket::check_index(RGWBucketAdminOpState& op_state, - map& existing_stats, - map& calculated_stats, - std::string *err_msg) -{ - rgw_bucket bucket = op_state.get_bucket(); - bool fix_index = op_state.will_fix_index(); - - int r = store->bucket_check_index(bucket_info, &existing_stats, &calculated_stats); - if (r < 0) { - set_err_msg(err_msg, "failed to check index error=" + cpp_strerror(-r)); - return r; - } - - if (fix_index) { - r = store->bucket_rebuild_index(bucket_info); - if (r < 0) { - set_err_msg(err_msg, "failed to rebuild index err=" + cpp_strerror(-r)); - return r; - } - } - - return 0; -} - - -int RGWBucket::policy_bl_to_stream(bufferlist& bl, ostream& o) -{ - RGWAccessControlPolicy_S3 policy(g_ceph_context); - bufferlist::iterator iter = bl.begin(); - try { - policy.decode(iter); - } catch (buffer::error& err) { - dout(0) << "ERROR: caught buffer::error, could not decode policy" << dendl; - return -EIO; - } - policy.to_xml(o); - return 0; -} - -static int policy_decode(RGWRados *store, bufferlist& bl, RGWAccessControlPolicy& policy) -{ - bufferlist::iterator iter = bl.begin(); - try { - policy.decode(iter); - } catch (buffer::error& err) { - ldout(store->ctx(), 0) << "ERROR: caught buffer::error, could not decode policy" << dendl; - return -EIO; - } - return 0; -} - -int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy) -{ - std::string object_name = op_state.get_object_name(); - rgw_bucket bucket = op_state.get_bucket(); - RGWObjectCtx obj_ctx(store); - - RGWBucketInfo bucket_info; - map attrs; - int ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, bucket_info, NULL, &attrs); - if (ret < 0) { - return ret; - } - - if (!object_name.empty()) { - bufferlist bl; - rgw_obj obj(bucket, object_name); - - RGWRados::Object op_target(store, bucket_info, obj_ctx, obj); - RGWRados::Object::Read rop(&op_target); - - int ret = rop.get_attr(RGW_ATTR_ACL, bl); - if (ret < 0) - return ret; - - return policy_decode(store, bl, policy); - } - - map::iterator aiter = attrs.find(RGW_ATTR_ACL); - if (aiter == attrs.end()) { - return -ENOENT; - } - - return policy_decode(store, aiter->second, policy); -} - - -int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state, - RGWAccessControlPolicy& policy) -{ - RGWBucket bucket; - - int ret = bucket.init(store, op_state); - if (ret < 0) - return ret; - - ret = bucket.get_policy(op_state, policy); - if (ret < 0) - return ret; - - return 0; -} - -/* Wrappers to facilitate RESTful interface */ - - -int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state, - RGWFormatterFlusher& flusher) -{ - RGWAccessControlPolicy policy(store->ctx()); - - int ret = get_policy(store, op_state, policy); - if (ret < 0) - return ret; - - Formatter *formatter = flusher.get_formatter(); - - flusher.start(0); - - formatter->open_object_section("policy"); - policy.dump(formatter); - formatter->close_section(); - - flusher.flush(); - - return 0; -} - -int RGWBucketAdminOp::dump_s3_policy(RGWRados *store, RGWBucketAdminOpState& op_state, - ostream& os) -{ - RGWAccessControlPolicy_S3 policy(store->ctx()); - - int ret = get_policy(store, op_state, policy); - if (ret < 0) - return ret; - - policy.to_xml(os); - - return 0; -} - -int RGWBucketAdminOp::unlink(RGWRados *store, RGWBucketAdminOpState& op_state) -{ - RGWBucket bucket; - - int ret = bucket.init(store, op_state); - if (ret < 0) - return ret; - - return bucket.unlink(op_state); -} - -int RGWBucketAdminOp::link(RGWRados *store, RGWBucketAdminOpState& op_state, string *err) -{ - RGWBucket bucket; - - int ret = bucket.init(store, op_state); - if (ret < 0) - return ret; - - return bucket.link(op_state, err); - -} - -int RGWBucketAdminOp::check_index(RGWRados *store, RGWBucketAdminOpState& op_state, - RGWFormatterFlusher& flusher) -{ - int ret; - map existing_stats; - map calculated_stats; - - - RGWBucket bucket; - - ret = bucket.init(store, op_state); - if (ret < 0) - return ret; - - Formatter *formatter = flusher.get_formatter(); - flusher.start(0); - - ret = bucket.check_bad_index_multipart(op_state, flusher); - if (ret < 0) - return ret; - - ret = bucket.check_object_index(op_state, flusher); - if (ret < 0) - return ret; - - ret = bucket.check_index(op_state, existing_stats, calculated_stats); - if (ret < 0) - return ret; - - dump_index_check(existing_stats, calculated_stats, formatter); - flusher.flush(); - - return 0; -} - -int RGWBucketAdminOp::remove_bucket(RGWRados *store, RGWBucketAdminOpState& op_state, - bool bypass_gc, bool keep_index_consistent) -{ - RGWBucket bucket; - - int ret = bucket.init(store, op_state); - if (ret < 0) - return ret; - - std::string err_msg; - ret = bucket.remove(op_state, bypass_gc, keep_index_consistent, &err_msg); - if (!err_msg.empty()) { - lderr(store->ctx()) << "ERROR: " << err_msg << dendl; - } - return ret; -} - -int RGWBucketAdminOp::remove_object(RGWRados *store, RGWBucketAdminOpState& op_state) -{ - RGWBucket bucket; - - int ret = bucket.init(store, op_state); - if (ret < 0) - return ret; - - return bucket.remove_object(op_state); -} - -static int bucket_stats(RGWRados *store, const std::string& tenant_name, std::string& bucket_name, Formatter *formatter) -{ - RGWBucketInfo bucket_info; - map stats; - - real_time mtime; - RGWObjectCtx obj_ctx(store); - int r = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, &mtime); - if (r < 0) - return r; - - rgw_bucket& bucket = bucket_info.bucket; - - string bucket_ver, master_ver; - string max_marker; - int ret = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, &max_marker); - if (ret < 0) { - cerr << "error getting bucket stats ret=" << ret << std::endl; - return ret; - } - - utime_t ut(mtime); - - formatter->open_object_section("stats"); - formatter->dump_string("bucket", bucket.name); - formatter->dump_string("zonegroup", bucket_info.zonegroup); - formatter->dump_string("placement_rule", bucket_info.placement_rule); - ::encode_json("explicit_placement", bucket.explicit_placement, formatter); - formatter->dump_string("id", bucket.bucket_id); - formatter->dump_string("marker", bucket.marker); - formatter->dump_stream("index_type") << bucket_info.index_type; - ::encode_json("owner", bucket_info.owner, formatter); - formatter->dump_string("ver", bucket_ver); - formatter->dump_string("master_ver", master_ver); - formatter->dump_stream("mtime") << ut; - formatter->dump_string("max_marker", max_marker); - dump_bucket_usage(stats, formatter); - encode_json("bucket_quota", bucket_info.quota, formatter); - formatter->close_section(); - - return 0; -} - -int RGWBucketAdminOp::limit_check(RGWRados *store, - RGWBucketAdminOpState& op_state, - const std::list& user_ids, - RGWFormatterFlusher& flusher, - bool warnings_only) -{ - int ret = 0; - const size_t max_entries = - store->ctx()->_conf->rgw_list_buckets_max_chunk; - - const size_t safe_max_objs_per_shard = - store->ctx()->_conf->rgw_safe_max_objects_per_shard; - - uint16_t shard_warn_pct = - store->ctx()->_conf->rgw_shard_warning_threshold; - if (shard_warn_pct > 100) - shard_warn_pct = 90; - - Formatter *formatter = flusher.get_formatter(); - flusher.start(0); - - formatter->open_array_section("users"); - - for (const auto& user_id : user_ids) { - formatter->open_object_section("user"); - formatter->dump_string("user_id", user_id); - bool done; - formatter->open_array_section("buckets"); - do { - RGWUserBuckets buckets; - string marker; - bool is_truncated; - - ret = rgw_read_user_buckets(store, user_id, buckets, - marker, string(), max_entries, false, - &is_truncated); - if (ret < 0) - return ret; - - map& m_buckets = buckets.get_buckets(); - - for (const auto& iter : m_buckets) { - auto& bucket = iter.second.bucket; - uint32_t num_shards = 1; - uint64_t num_objects = 0; - - /* need info for num_shards */ - RGWBucketInfo info; - RGWObjectCtx obj_ctx(store); - - marker = bucket.name; /* Casey's location for marker update, - * as we may now not reach the end of - * the loop body */ - - ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, - info, nullptr); - if (ret < 0) - continue; - - /* need stats for num_entries */ - string bucket_ver, master_ver; - std::map stats; - ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, - &master_ver, stats, nullptr); - - if (ret < 0) - continue; - - for (const auto& s : stats) { - num_objects += s.second.num_objects; - } - - num_shards = info.num_shards; - uint64_t objs_per_shard = - (num_shards) ? num_objects/num_shards : num_objects; - { - bool warn = false; - stringstream ss; - if (objs_per_shard > safe_max_objs_per_shard) { - double over = - 100 - (safe_max_objs_per_shard/objs_per_shard * 100); - ss << boost::format("OVER %4f%%") % over; - warn = true; - } else { - double fill_pct = - objs_per_shard / safe_max_objs_per_shard * 100; - if (fill_pct >= shard_warn_pct) { - ss << boost::format("WARN %4f%%") % fill_pct; - warn = true; - } else { - ss << "OK"; - } - } - - if (warn || (! warnings_only)) { - formatter->open_object_section("bucket"); - formatter->dump_string("bucket", bucket.name); - formatter->dump_string("tenant", bucket.tenant); - formatter->dump_int("num_objects", num_objects); - formatter->dump_int("num_shards", num_shards); - formatter->dump_int("objects_per_shard", objs_per_shard); - formatter->dump_string("fill_status", ss.str()); - formatter->close_section(); - } - } - } - - done = (m_buckets.size() < max_entries); - } while (!done); /* foreach: bucket */ - - formatter->close_section(); - formatter->close_section(); - formatter->flush(cout); - - } /* foreach: user_id */ - - formatter->close_section(); - formatter->flush(cout); - - return ret; -} /* RGWBucketAdminOp::limit_check */ - -int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state, - RGWFormatterFlusher& flusher) -{ - RGWBucket bucket; - int ret; - - string bucket_name = op_state.get_bucket_name(); - - if (!bucket_name.empty()) { - ret = bucket.init(store, op_state); - if (ret < 0) - return ret; - } - - Formatter *formatter = flusher.get_formatter(); - flusher.start(0); - - CephContext *cct = store->ctx(); - - const size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk; - - bool show_stats = op_state.will_fetch_stats(); - rgw_user user_id = op_state.get_user_id(); - if (op_state.is_user_op()) { - formatter->open_array_section("buckets"); - - RGWUserBuckets buckets; - string marker; - bool is_truncated = false; - - do { - ret = rgw_read_user_buckets(store, op_state.get_user_id(), buckets, - marker, string(), max_entries, false, - &is_truncated); - if (ret < 0) - return ret; - - map& m = buckets.get_buckets(); - map::iterator iter; - - for (iter = m.begin(); iter != m.end(); ++iter) { - std::string obj_name = iter->first; - if (show_stats) - bucket_stats(store, user_id.tenant, obj_name, formatter); - else - formatter->dump_string("bucket", obj_name); - - marker = obj_name; - } - - flusher.flush(); - } while (is_truncated); - - formatter->close_section(); - } else if (!bucket_name.empty()) { - bucket_stats(store, user_id.tenant, bucket_name, formatter); - } else { - RGWAccessHandle handle; - - formatter->open_array_section("buckets"); - if (store->list_buckets_init(&handle) >= 0) { - rgw_bucket_dir_entry obj; - while (store->list_buckets_next(obj, &handle) >= 0) { - if (show_stats) - bucket_stats(store, user_id.tenant, obj.key.name, formatter); - else - formatter->dump_string("bucket", obj.key.name); - } - } - - formatter->close_section(); - } - - flusher.flush(); - - return 0; -} - - -void rgw_data_change::dump(Formatter *f) const -{ - string type; - switch (entity_type) { - case ENTITY_TYPE_BUCKET: - type = "bucket"; - break; - default: - type = "unknown"; - } - encode_json("entity_type", type, f); - encode_json("key", key, f); - utime_t ut(timestamp); - encode_json("timestamp", ut, f); -} - -void rgw_data_change::decode_json(JSONObj *obj) { - string s; - JSONDecoder::decode_json("entity_type", s, obj); - if (s == "bucket") { - entity_type = ENTITY_TYPE_BUCKET; - } else { - entity_type = ENTITY_TYPE_UNKNOWN; - } - JSONDecoder::decode_json("key", key, obj); - utime_t ut; - JSONDecoder::decode_json("timestamp", ut, obj); - timestamp = ut.to_real_time(); -} - -void rgw_data_change_log_entry::dump(Formatter *f) const -{ - encode_json("log_id", log_id, f); - utime_t ut(log_timestamp); - encode_json("log_timestamp", ut, f); - encode_json("entry", entry, f); -} - -void rgw_data_change_log_entry::decode_json(JSONObj *obj) { - JSONDecoder::decode_json("log_id", log_id, obj); - utime_t ut; - JSONDecoder::decode_json("log_timestamp", ut, obj); - log_timestamp = ut.to_real_time(); - JSONDecoder::decode_json("entry", entry, obj); -} - -int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) { - const string& name = bs.bucket.name; - int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0); - uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards; - - return (int)r; -} - -int RGWDataChangesLog::renew_entries() -{ - if (!store->need_to_log_data()) - return 0; - - /* we can't keep the bucket name as part of the cls_log_entry, and we need - * it later, so we keep two lists under the map */ - map, list > > m; - - lock.Lock(); - map entries; - entries.swap(cur_cycle); - lock.Unlock(); - - map::iterator iter; - string section; - real_time ut = real_clock::now(); - for (iter = entries.begin(); iter != entries.end(); ++iter) { - const rgw_bucket_shard& bs = iter->first; - - int index = choose_oid(bs); - - cls_log_entry entry; - - rgw_data_change change; - bufferlist bl; - change.entity_type = ENTITY_TYPE_BUCKET; - change.key = bs.get_key(); - change.timestamp = ut; - ::encode(change, bl); - - store->time_log_prepare_entry(entry, ut, section, change.key, bl); - - m[index].first.push_back(bs); - m[index].second.emplace_back(std::move(entry)); - } - - map, list > >::iterator miter; - for (miter = m.begin(); miter != m.end(); ++miter) { - list& entries = miter->second.second; - - real_time now = real_clock::now(); - - int ret = store->time_log_add(oids[miter->first], entries, NULL); - if (ret < 0) { - /* we don't really need to have a special handling for failed cases here, - * as this is just an optimization. */ - lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl; - return ret; - } - - real_time expiration = now; - expiration += make_timespan(cct->_conf->rgw_data_log_window); - - list& buckets = miter->second.first; - list::iterator liter; - for (liter = buckets.begin(); liter != buckets.end(); ++liter) { - update_renewed(*liter, expiration); - } - } - - return 0; -} - -void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status) -{ - assert(lock.is_locked()); - if (!changes.find(bs, status)) { - status = ChangeStatusPtr(new ChangeStatus); - changes.add(bs, status); - } -} - -void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs) -{ - Mutex::Locker l(lock); - cur_cycle[bs] = true; -} - -void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration) -{ - Mutex::Locker l(lock); - ChangeStatusPtr status; - _get_change(bs, status); - - ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl; - status->cur_expiration = expiration; -} - -int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) { - rgw_bucket_shard bs(bucket, shard_id); - - return choose_oid(bs); -} - -int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { - if (!store->need_to_log_data()) - return 0; - - rgw_bucket_shard bs(bucket, shard_id); - - int index = choose_oid(bs); - mark_modified(index, bs); - - lock.Lock(); - - ChangeStatusPtr status; - _get_change(bs, status); - - lock.Unlock(); - - real_time now = real_clock::now(); - - status->lock->Lock(); - - ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl; - - if (now < status->cur_expiration) { - /* no need to send, recently completed */ - status->lock->Unlock(); - - register_renew(bs); - return 0; - } - - RefCountedCond *cond; - - if (status->pending) { - cond = status->cond; - - assert(cond); - - status->cond->get(); - status->lock->Unlock(); - - int ret = cond->wait(); - cond->put(); - if (!ret) { - register_renew(bs); - } - return ret; - } - - status->cond = new RefCountedCond; - status->pending = true; - - string& oid = oids[index]; - real_time expiration; - - int ret; - - do { - status->cur_sent = now; - - expiration = now; - expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window); - - status->lock->Unlock(); - - bufferlist bl; - rgw_data_change change; - change.entity_type = ENTITY_TYPE_BUCKET; - change.key = bs.get_key(); - change.timestamp = now; - ::encode(change, bl); - string section; - - ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl; - - ret = store->time_log_add(oid, now, section, change.key, bl); - - now = real_clock::now(); - - status->lock->Lock(); - - } while (!ret && real_clock::now() > expiration); - - cond = status->cond; - - status->pending = false; - status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */ - status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window); - status->cond = NULL; - status->lock->Unlock(); - - cond->done(ret); - cond->put(); - - return ret; -} - -int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries, - list& entries, - const string& marker, - string *out_marker, - bool *truncated) { - if (shard >= num_shards) - return -EINVAL; - - list log_entries; - - int ret = store->time_log_list(oids[shard], start_time, end_time, - max_entries, log_entries, marker, - out_marker, truncated); - if (ret < 0) - return ret; - - list::iterator iter; - for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) { - rgw_data_change_log_entry log_entry; - log_entry.log_id = iter->id; - real_time rt = iter->timestamp.to_real_time(); - log_entry.log_timestamp = rt; - bufferlist::iterator liter = iter->data.begin(); - try { - ::decode(log_entry.entry, liter); - } catch (buffer::error& err) { - lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl; - return -EIO; - } - entries.push_back(log_entry); - } - - return 0; -} - -int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time& end_time, int max_entries, - list& entries, LogMarker& marker, bool *ptruncated) { - bool truncated; - entries.clear(); - - for (; marker.shard < num_shards && (int)entries.size() < max_entries; - marker.shard++, marker.marker.clear()) { - int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries, - marker.marker, NULL, &truncated); - if (ret == -ENOENT) { - continue; - } - if (ret < 0) { - return ret; - } - if (truncated) { - *ptruncated = true; - return 0; - } - } - - *ptruncated = (marker.shard < num_shards); - - return 0; -} - -int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info) -{ - if (shard_id >= num_shards) - return -EINVAL; - - string oid = oids[shard_id]; - - cls_log_header header; - - int ret = store->time_log_info(oid, &header); - if ((ret < 0) && (ret != -ENOENT)) - return ret; - - info->marker = header.max_marker; - info->last_update = header.max_time.to_real_time(); - - return 0; -} - -int RGWDataChangesLog::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time, - const string& start_marker, const string& end_marker) -{ - int ret; - - if (shard_id > num_shards) - return -EINVAL; - - ret = store->time_log_trim(oids[shard_id], start_time, end_time, start_marker, end_marker); - - if (ret == -ENOENT || ret == -ENODATA) - ret = 0; - - return ret; -} - -int RGWDataChangesLog::trim_entries(const real_time& start_time, const real_time& end_time, - const string& start_marker, const string& end_marker) -{ - for (int shard = 0; shard < num_shards; shard++) { - int ret = store->time_log_trim(oids[shard], start_time, end_time, start_marker, end_marker); - if (ret == -ENOENT || ret == -ENODATA) { - continue; - } - if (ret < 0) - return ret; - } - - return 0; -} - -bool RGWDataChangesLog::going_down() -{ - return down_flag; -} - -RGWDataChangesLog::~RGWDataChangesLog() { - down_flag = true; - renew_thread->stop(); - renew_thread->join(); - delete renew_thread; - delete[] oids; -} - -void *RGWDataChangesLog::ChangesRenewThread::entry() { - do { - dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl; - int r = log->renew_entries(); - if (r < 0) { - dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl; - } - - if (log->going_down()) - break; - - int interval = cct->_conf->rgw_data_log_window * 3 / 4; - lock.Lock(); - cond.WaitInterval(lock, utime_t(interval, 0)); - lock.Unlock(); - } while (!log->going_down()); - - return NULL; -} - -void RGWDataChangesLog::ChangesRenewThread::stop() -{ - Mutex::Locker l(lock); - cond.Signal(); -} - -void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs) -{ - auto key = bs.get_key(); - modified_lock.get_read(); - map >::iterator iter = modified_shards.find(shard_id); - if (iter != modified_shards.end()) { - set& keys = iter->second; - if (keys.find(key) != keys.end()) { - modified_lock.unlock(); - return; - } - } - modified_lock.unlock(); - - RWLock::WLocker wl(modified_lock); - modified_shards[shard_id].insert(key); -} - -void RGWDataChangesLog::read_clear_modified(map > &modified) -{ - RWLock::WLocker wl(modified_lock); - modified.swap(modified_shards); - modified_shards.clear(); -} - -void RGWBucketCompleteInfo::dump(Formatter *f) const { - encode_json("bucket_info", info, f); - encode_json("attrs", attrs, f); -} - -void RGWBucketCompleteInfo::decode_json(JSONObj *obj) { - JSONDecoder::decode_json("bucket_info", info, obj); - JSONDecoder::decode_json("attrs", attrs, obj); -} - -class RGWBucketMetadataHandler : public RGWMetadataHandler { - -public: - string get_type() override { return "bucket"; } - - int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override { - RGWObjVersionTracker ot; - RGWBucketEntryPoint be; - - real_time mtime; - map attrs; - RGWObjectCtx obj_ctx(store); - - string tenant_name, bucket_name; - parse_bucket(entry, &tenant_name, &bucket_name); - int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &ot, &mtime, &attrs); - if (ret < 0) - return ret; - - RGWBucketEntryMetadataObject *mdo = new RGWBucketEntryMetadataObject(be, ot.read_version, mtime); - - *obj = mdo; - - return 0; - } - - int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, - real_time mtime, JSONObj *obj, sync_type_t sync_type) override { - RGWBucketEntryPoint be, old_be; - try { - decode_json_obj(be, obj); - } catch (JSONDecoder::err& e) { - return -EINVAL; - } - - real_time orig_mtime; - map attrs; - - RGWObjVersionTracker old_ot; - RGWObjectCtx obj_ctx(store); - - string tenant_name, bucket_name; - parse_bucket(entry, &tenant_name, &bucket_name); - int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, old_be, &old_ot, &orig_mtime, &attrs); - if (ret < 0 && ret != -ENOENT) - return ret; - - // are we actually going to perform this put, or is it too old? - if (ret != -ENOENT && - !check_versions(old_ot.read_version, orig_mtime, - objv_tracker.write_version, mtime, sync_type)) { - return STATUS_NO_APPLY; - } - - objv_tracker.read_version = old_ot.read_version; /* maintain the obj version we just read */ - - ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, be, false, objv_tracker, mtime, &attrs); - if (ret < 0) - return ret; - - /* link bucket */ - if (be.linked) { - ret = rgw_link_bucket(store, be.owner, be.bucket, be.creation_time, false); - } else { - ret = rgw_unlink_bucket(store, be.owner, be.bucket.tenant, - be.bucket.name, false); - } - - return ret; - } - - struct list_keys_info { - RGWRados *store; - RGWListRawObjsCtx ctx; - }; - - int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override { - RGWBucketEntryPoint be; - RGWObjectCtx obj_ctx(store); - - string tenant_name, bucket_name; - parse_bucket(entry, &tenant_name, &bucket_name); - int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &objv_tracker, NULL, NULL); - if (ret < 0) - return ret; - - /* - * We're unlinking the bucket but we don't want to update the entrypoint here - we're removing - * it immediately and don't want to invalidate our cached objv_version or the bucket obj removal - * will incorrectly fail. - */ - ret = rgw_unlink_bucket(store, be.owner, tenant_name, bucket_name, false); - if (ret < 0) { - lderr(store->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl; - } - - ret = rgw_bucket_delete_bucket_obj(store, tenant_name, bucket_name, objv_tracker); - if (ret < 0) { - lderr(store->ctx()) << "could not delete bucket=" << entry << dendl; - } - /* idempotent */ - return 0; - } - - void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override { - oid = key; - pool = store->get_zone_params().domain_root; - } - - int list_keys_init(RGWRados *store, const string& marker, void **phandle) override { - auto info = ceph::make_unique(); - - info->store = store; - - int ret = store->list_raw_objects_init(store->get_zone_params().domain_root, marker, - &info->ctx); - if (ret < 0) { - return ret; - } - *phandle = (void *)info.release(); - - return 0; - } - - int list_keys_next(void *handle, int max, list& keys, bool *truncated) override { - list_keys_info *info = static_cast(handle); - - string no_filter; - - keys.clear(); - - RGWRados *store = info->store; - - list unfiltered_keys; - - int ret = store->list_raw_objects_next(no_filter, max, info->ctx, - unfiltered_keys, truncated); - if (ret < 0 && ret != -ENOENT) - return ret; - if (ret == -ENOENT) { - if (truncated) - *truncated = false; - return 0; - } - - // now filter out the system entries - list::iterator iter; - for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) { - string& k = *iter; - - if (k[0] != '.') { - keys.push_back(k); - } - } - - return 0; - } - - void list_keys_complete(void *handle) override { - list_keys_info *info = static_cast(handle); - delete info; - } - - string get_marker(void *handle) { - list_keys_info *info = static_cast(handle); - return info->store->list_raw_objs_get_cursor(info->ctx); - } -}; - -class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler { - -public: - string get_type() override { return "bucket.instance"; } - - int get(RGWRados *store, string& oid, RGWMetadataObject **obj) override { - RGWBucketCompleteInfo bci; - - real_time mtime; - RGWObjectCtx obj_ctx(store); - - int ret = store->get_bucket_instance_info(obj_ctx, oid, bci.info, &mtime, &bci.attrs); - if (ret < 0) - return ret; - - RGWBucketInstanceMetadataObject *mdo = new RGWBucketInstanceMetadataObject(bci, bci.info.objv_tracker.read_version, mtime); - - *obj = mdo; - - return 0; - } - - int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker, - real_time mtime, JSONObj *obj, sync_type_t sync_type) override { - RGWBucketCompleteInfo bci, old_bci; - try { - decode_json_obj(bci, obj); - } catch (JSONDecoder::err& e) { - return -EINVAL; - } - - real_time orig_mtime; - RGWObjectCtx obj_ctx(store); - - int ret = store->get_bucket_instance_info(obj_ctx, entry, old_bci.info, - &orig_mtime, &old_bci.attrs); - bool exists = (ret != -ENOENT); - if (ret < 0 && exists) - return ret; - - if (!exists || old_bci.info.bucket.bucket_id != bci.info.bucket.bucket_id) { - /* a new bucket, we need to select a new bucket placement for it */ - auto key(entry); - rgw_bucket_instance_oid_to_key(key); - string tenant_name; - string bucket_name; - string bucket_instance; - parse_bucket(key, &tenant_name, &bucket_name, &bucket_instance); - - RGWZonePlacementInfo rule_info; - bci.info.bucket.name = bucket_name; - bci.info.bucket.bucket_id = bucket_instance; - bci.info.bucket.tenant = tenant_name; - ret = store->select_bucket_location_by_rule(bci.info.placement_rule, &rule_info); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: select_bucket_placement() returned " << ret << dendl; - return ret; - } - bci.info.index_type = rule_info.index_type; - } else { - /* existing bucket, keep its placement */ - bci.info.bucket.explicit_placement = old_bci.info.bucket.explicit_placement; - bci.info.placement_rule = old_bci.info.placement_rule; - } - - if (exists && old_bci.info.datasync_flag_enabled() != bci.info.datasync_flag_enabled()) { - int shards_num = bci.info.num_shards? bci.info.num_shards : 1; - int shard_id = bci.info.num_shards? 0 : -1; - - if (!bci.info.datasync_flag_enabled()) { - ret = store->stop_bi_log_entries(bci.info, -1); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl; - return ret; - } - } else { - ret = store->resync_bi_log_entries(bci.info, -1); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl; - return ret; - } - } - - for (int i = 0; i < shards_num; ++i, ++shard_id) { - ret = store->data_log->add_entry(bci.info.bucket, shard_id); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; - return ret; - } - } - } - - // are we actually going to perform this put, or is it too old? - if (exists && - !check_versions(old_bci.info.objv_tracker.read_version, orig_mtime, - objv_tracker.write_version, mtime, sync_type)) { - objv_tracker.read_version = old_bci.info.objv_tracker.read_version; - return STATUS_NO_APPLY; - } - - /* record the read version (if any), store the new version */ - bci.info.objv_tracker.read_version = old_bci.info.objv_tracker.read_version; - bci.info.objv_tracker.write_version = objv_tracker.write_version; - - ret = store->put_bucket_instance_info(bci.info, false, mtime, &bci.attrs); - if (ret < 0) - return ret; - - objv_tracker = bci.info.objv_tracker; - - ret = store->init_bucket_index(bci.info, bci.info.num_shards); - if (ret < 0) - return ret; - - return STATUS_APPLIED; - } - - struct list_keys_info { - RGWRados *store; - RGWListRawObjsCtx ctx; - }; - - int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override { - RGWBucketInfo info; - RGWObjectCtx obj_ctx(store); - - int ret = store->get_bucket_instance_info(obj_ctx, entry, info, NULL, NULL); - if (ret < 0 && ret != -ENOENT) - return ret; - - return rgw_bucket_instance_remove_entry(store, entry, &info.objv_tracker); - } - - void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override { - oid = RGW_BUCKET_INSTANCE_MD_PREFIX + key; - rgw_bucket_instance_key_to_oid(oid); - pool = store->get_zone_params().domain_root; - } - - int list_keys_init(RGWRados *store, const string& marker, void **phandle) override { - auto info = ceph::make_unique(); - - info->store = store; - - int ret = store->list_raw_objects_init(store->get_zone_params().domain_root, marker, - &info->ctx); - if (ret < 0) { - return ret; - } - *phandle = (void *)info.release(); - - return 0; - } - - int list_keys_next(void *handle, int max, list& keys, bool *truncated) override { - list_keys_info *info = static_cast(handle); - - string no_filter; - - keys.clear(); - - RGWRados *store = info->store; - - list unfiltered_keys; - - int ret = store->list_raw_objects_next(no_filter, max, info->ctx, - unfiltered_keys, truncated); - if (ret < 0 && ret != -ENOENT) - return ret; - if (ret == -ENOENT) { - if (truncated) - *truncated = false; - return 0; - } - - constexpr int prefix_size = sizeof(RGW_BUCKET_INSTANCE_MD_PREFIX) - 1; - // now filter in the relevant entries - list::iterator iter; - for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) { - string& k = *iter; - - if (k.compare(0, prefix_size, RGW_BUCKET_INSTANCE_MD_PREFIX) == 0) { - auto oid = k.substr(prefix_size); - rgw_bucket_instance_oid_to_key(oid); - keys.emplace_back(std::move(oid)); - } - } - - return 0; - } - - void list_keys_complete(void *handle) override { - list_keys_info *info = static_cast(handle); - delete info; - } - - string get_marker(void *handle) { - list_keys_info *info = static_cast(handle); - return info->store->list_raw_objs_get_cursor(info->ctx); - } - - /* - * hash entry for mdlog placement. Use the same hash key we'd have for the bucket entry - * point, so that the log entries end up at the same log shard, so that we process them - * in order - */ - void get_hash_key(const string& section, const string& key, string& hash_key) override { - string k; - int pos = key.find(':'); - if (pos < 0) - k = key; - else - k = key.substr(0, pos); - hash_key = "bucket:" + k; - } -}; - -void rgw_bucket_init(RGWMetadataManager *mm) -{ - bucket_meta_handler = new RGWBucketMetadataHandler; - mm->register_handler(bucket_meta_handler); - bucket_instance_meta_handler = new RGWBucketInstanceMetadataHandler; - mm->register_handler(bucket_instance_meta_handler); -}