// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #ifndef CEPH_RGW_BUCKET_H #define CEPH_RGW_BUCKET_H #include #include #include "include/types.h" #include "rgw_common.h" #include "rgw_tools.h" #include "rgw_rados.h" #include "rgw_string.h" #include "common/Formatter.h" #include "common/lru_map.h" #include "common/ceph_time.h" #include "rgw_formats.h" // define as static when RGWBucket implementation compete extern void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id); extern int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive, map *pattrs, RGWObjVersionTracker *objv_tracker, real_time mtime); extern int rgw_bucket_instance_store_info(RGWRados *store, string& oid, bufferlist& bl, bool exclusive, map *pattrs, RGWObjVersionTracker *objv_tracker, real_time mtime); extern int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id); extern int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key, rgw_bucket* bucket, int *shard_id); extern int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker); extern void rgw_bucket_instance_key_to_oid(string& key); extern void rgw_bucket_instance_oid_to_key(string& oid); extern int rgw_bucket_delete_bucket_obj(RGWRados *store, const string& tenant_name, const string& bucket_name, RGWObjVersionTracker& objv_tracker); extern int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info); extern int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name); extern std::string rgw_make_bucket_entry_name(const std::string& tenant_name, const std::string& bucket_name); static inline void rgw_make_bucket_entry_name(const string& tenant_name, const string& bucket_name, std::string& bucket_entry) { bucket_entry = rgw_make_bucket_entry_name(tenant_name, bucket_name); } extern void rgw_parse_url_bucket(const string& bucket, const string& auth_tenant, string &tenant_name, string &bucket_name); struct RGWBucketCompleteInfo { RGWBucketInfo info; map attrs; void dump(Formatter *f) const; void decode_json(JSONObj *obj); }; class RGWBucketEntryMetadataObject : public RGWMetadataObject { RGWBucketEntryPoint ep; public: RGWBucketEntryMetadataObject(RGWBucketEntryPoint& _ep, obj_version& v, real_time m) : ep(_ep) { objv = v; mtime = m; } void dump(Formatter *f) const override { ep.dump(f); } }; class RGWBucketInstanceMetadataObject : public RGWMetadataObject { RGWBucketCompleteInfo info; public: RGWBucketInstanceMetadataObject() {} RGWBucketInstanceMetadataObject(RGWBucketCompleteInfo& i, obj_version& v, real_time m) : info(i) { objv = v; mtime = m; } void dump(Formatter *f) const override { info.dump(f); } void decode_json(JSONObj *obj) { info.decode_json(obj); } RGWBucketInfo& get_bucket_info() { return info.info; } }; /** * Store a list of the user's buckets, with associated functinos. */ class RGWUserBuckets { std::map buckets; public: RGWUserBuckets() = default; RGWUserBuckets(RGWUserBuckets&&) = default; RGWUserBuckets& operator=(const RGWUserBuckets&) = default; void encode(bufferlist& bl) const { ::encode(buckets, bl); } void decode(bufferlist::iterator& bl) { ::decode(buckets, bl); } /** * Check if the user owns a bucket by the given name. */ bool owns(string& name) { map::iterator iter; iter = buckets.find(name); return (iter != buckets.end()); } /** * Add a (created) bucket to the user's bucket list. */ void add(const RGWBucketEnt& bucket) { buckets[bucket.bucket.name] = bucket; } /** * Remove a bucket from the user's list by name. */ void remove(string& name) { map::iterator iter; iter = buckets.find(name); if (iter != buckets.end()) { buckets.erase(iter); } } /** * Get the user's buckets as a map. */ map& get_buckets() { return buckets; } /** * Cleanup data structure */ void clear() { buckets.clear(); } size_t count() { return buckets.size(); } }; WRITE_CLASS_ENCODER(RGWUserBuckets) class RGWMetadataManager; extern void rgw_bucket_init(RGWMetadataManager *mm); /** * Get all the buckets owned by a user and fill up an RGWUserBuckets with them. * Returns: 0 on success, -ERR# on failure. */ extern int rgw_read_user_buckets(RGWRados *store, const rgw_user& user_id, RGWUserBuckets& buckets, const string& marker, const string& end_marker, uint64_t max, bool need_stats, bool* is_truncated, uint64_t default_amount = 1000); extern int rgw_link_bucket(RGWRados* store, const rgw_user& user_id, rgw_bucket& bucket, ceph::real_time creation_time, bool update_entrypoint = true); extern int rgw_unlink_bucket(RGWRados *store, const rgw_user& user_id, const string& tenant_name, const string& bucket_name, bool update_entrypoint = true); extern int rgw_remove_object(RGWRados *store, RGWBucketInfo& bucket_info, rgw_bucket& bucket, rgw_obj_key& key); extern int rgw_remove_bucket(RGWRados *store, rgw_bucket& bucket, bool delete_children); extern int rgw_remove_bucket_bypass_gc(RGWRados *store, rgw_bucket& bucket, int concurrent_max); extern int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info, map& attrs, RGWObjVersionTracker *objv_tracker); extern void check_bad_user_bucket_mapping(RGWRados *store, const rgw_user& user_id, bool fix); struct RGWBucketAdminOpState { rgw_user uid; std::string display_name; std::string bucket_name; std::string bucket_id; std::string object_name; bool list_buckets; bool stat_buckets; bool check_objects; bool fix_index; bool delete_child_objects; bool bucket_stored; int max_aio; rgw_bucket bucket; void set_fetch_stats(bool value) { stat_buckets = value; } void set_check_objects(bool value) { check_objects = value; } void set_fix_index(bool value) { fix_index = value; } void set_delete_children(bool value) { delete_child_objects = value; } void set_max_aio(int value) { max_aio = value; } void set_user_id(const rgw_user& user_id) { if (!user_id.empty()) uid = user_id; } void set_bucket_name(const std::string& bucket_str) { bucket_name = bucket_str; } void set_object(std::string& object_str) { object_name = object_str; } rgw_user& get_user_id() { return uid; } std::string& get_user_display_name() { return display_name; } std::string& get_bucket_name() { return bucket_name; } std::string& get_object_name() { return object_name; } rgw_bucket& get_bucket() { return bucket; } void set_bucket(rgw_bucket& _bucket) { bucket = _bucket; bucket_stored = true; } void set_bucket_id(const string& bi) { bucket_id = bi; } const string& get_bucket_id() { return bucket_id; } bool will_fetch_stats() { return stat_buckets; } bool will_fix_index() { return fix_index; } bool will_delete_children() { return delete_child_objects; } bool will_check_objects() { return check_objects; } bool is_user_op() { return !uid.empty(); } bool is_system_op() { return uid.empty(); } bool has_bucket_stored() { return bucket_stored; } int get_max_aio() { return max_aio; } RGWBucketAdminOpState() : list_buckets(false), stat_buckets(false), check_objects(false), fix_index(false), delete_child_objects(false), bucket_stored(false) {} }; /* * A simple wrapper class for administrative bucket operations */ class RGWBucket { RGWUserBuckets buckets; RGWRados *store; RGWAccessHandle handle; RGWUserInfo user_info; std::string tenant; std::string bucket_name; bool failure; RGWBucketInfo bucket_info; public: RGWBucket() : store(NULL), handle(NULL), failure(false) {} int init(RGWRados *storage, RGWBucketAdminOpState& op_state); int check_bad_index_multipart(RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher, std::string *err_msg = NULL); int check_object_index(RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher, std::string *err_msg = NULL); int check_index(RGWBucketAdminOpState& op_state, map& existing_stats, map& calculated_stats, std::string *err_msg = NULL); int remove(RGWBucketAdminOpState& op_state, bool bypass_gc = false, bool keep_index_consistent = true, std::string *err_msg = NULL); int link(RGWBucketAdminOpState& op_state, std::string *err_msg = NULL); int unlink(RGWBucketAdminOpState& op_state, std::string *err_msg = NULL); int remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg = NULL); int policy_bl_to_stream(bufferlist& bl, ostream& o); int get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy); void clear_failure() { failure = false; } }; class RGWBucketAdminOp { public: static int get_policy(RGWRados *store, RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher); static int get_policy(RGWRados *store, RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy); static int dump_s3_policy(RGWRados *store, RGWBucketAdminOpState& op_state, ostream& os); static int unlink(RGWRados *store, RGWBucketAdminOpState& op_state); static int link(RGWRados *store, RGWBucketAdminOpState& op_state, string *err_msg = NULL); static int check_index(RGWRados *store, RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher); static int remove_bucket(RGWRados *store, RGWBucketAdminOpState& op_state, bool bypass_gc = false, bool keep_index_consistent = true); static int remove_object(RGWRados *store, RGWBucketAdminOpState& op_state); static int info(RGWRados *store, RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher); static int limit_check(RGWRados *store, RGWBucketAdminOpState& op_state, const std::list& user_ids, RGWFormatterFlusher& flusher, bool warnings_only = false); }; enum DataLogEntityType { ENTITY_TYPE_UNKNOWN = 0, ENTITY_TYPE_BUCKET = 1, }; struct rgw_data_change { DataLogEntityType entity_type; string key; real_time timestamp; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); uint8_t t = (uint8_t)entity_type; ::encode(t, bl); ::encode(key, bl); ::encode(timestamp, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { DECODE_START(1, bl); uint8_t t; ::decode(t, bl); entity_type = (DataLogEntityType)t; ::decode(key, bl); ::decode(timestamp, bl); DECODE_FINISH(bl); } void dump(Formatter *f) const; void decode_json(JSONObj *obj); }; WRITE_CLASS_ENCODER(rgw_data_change) struct rgw_data_change_log_entry { string log_id; real_time log_timestamp; rgw_data_change entry; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); ::encode(log_id, bl); ::encode(log_timestamp, bl); ::encode(entry, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { DECODE_START(1, bl); ::decode(log_id, bl); ::decode(log_timestamp, bl); ::decode(entry, bl); DECODE_FINISH(bl); } void dump(Formatter *f) const; void decode_json(JSONObj *obj); }; WRITE_CLASS_ENCODER(rgw_data_change_log_entry) struct RGWDataChangesLogInfo { string marker; real_time last_update; void dump(Formatter *f) const; void decode_json(JSONObj *obj); }; class RGWDataChangesLog { CephContext *cct; RGWRados *store; int num_shards; string *oids; Mutex lock; RWLock modified_lock; map > modified_shards; std::atomic down_flag = { false }; struct ChangeStatus { real_time cur_expiration; real_time cur_sent; bool pending; RefCountedCond *cond; Mutex *lock; ChangeStatus() : pending(false), cond(NULL) { lock = new Mutex("RGWDataChangesLog::ChangeStatus"); } ~ChangeStatus() { delete lock; } }; typedef ceph::shared_ptr ChangeStatusPtr; lru_map changes; map cur_cycle; void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status); void register_renew(rgw_bucket_shard& bs); void update_renewed(rgw_bucket_shard& bs, real_time& expiration); class ChangesRenewThread : public Thread { CephContext *cct; RGWDataChangesLog *log; Mutex lock; Cond cond; public: ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log), lock("ChangesRenewThread::lock") {} void *entry() override; void stop(); }; ChangesRenewThread *renew_thread; public: RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWDataChangesLog::lock"), modified_lock("RGWDataChangesLog::modified_lock"), changes(cct->_conf->rgw_data_log_changes_size) { num_shards = cct->_conf->rgw_data_log_num_shards; oids = new string[num_shards]; string prefix = cct->_conf->rgw_data_log_obj_prefix; if (prefix.empty()) { prefix = "data_log"; } for (int i = 0; i < num_shards; i++) { char buf[16]; snprintf(buf, sizeof(buf), "%s.%d", prefix.c_str(), i); oids[i] = buf; } renew_thread = new ChangesRenewThread(cct, this); renew_thread->create("rgw_dt_lg_renew"); } ~RGWDataChangesLog(); int choose_oid(const rgw_bucket_shard& bs); const std::string& get_oid(int shard_id) const { return oids[shard_id]; } int add_entry(rgw_bucket& bucket, int shard_id); int get_log_shard_id(rgw_bucket& bucket, int shard_id); int renew_entries(); int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries, list& entries, const string& marker, string *out_marker, bool *truncated); int trim_entries(int shard_id, const real_time& start_time, const real_time& end_time, const string& start_marker, const string& end_marker); int trim_entries(const real_time& start_time, const real_time& end_time, const string& start_marker, const string& end_marker); int get_info(int shard_id, RGWDataChangesLogInfo *info); int lock_exclusive(int shard_id, timespan duration, string& zone_id, string& owner_id) { return store->lock_exclusive(store->get_zone_params().log_pool, oids[shard_id], duration, zone_id, owner_id); } int unlock(int shard_id, string& zone_id, string& owner_id) { return store->unlock(store->get_zone_params().log_pool, oids[shard_id], zone_id, owner_id); } struct LogMarker { int shard; string marker; LogMarker() : shard(0) {} }; int list_entries(const real_time& start_time, const real_time& end_time, int max_entries, list& entries, LogMarker& marker, bool *ptruncated); void mark_modified(int shard_id, const rgw_bucket_shard& bs); void read_clear_modified(map > &modified); bool going_down(); }; #endif