1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RGW_BUCKET_H
5 #define CEPH_RGW_BUCKET_H
10 #include "include/types.h"
11 #include "rgw_common.h"
12 #include "rgw_tools.h"
14 #include "rgw_rados.h"
16 #include "rgw_string.h"
18 #include "common/Formatter.h"
19 #include "common/lru_map.h"
20 #include "common/ceph_time.h"
21 #include "rgw_formats.h"
23 // define as static when RGWBucket implementation compete
24 extern void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id);
26 extern int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive,
27 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
29 extern int rgw_bucket_instance_store_info(RGWRados *store, string& oid, bufferlist& bl, bool exclusive,
30 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
33 extern int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id);
34 extern int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key,
35 rgw_bucket* bucket, int *shard_id);
37 extern int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker);
38 extern void rgw_bucket_instance_key_to_oid(string& key);
39 extern void rgw_bucket_instance_oid_to_key(string& oid);
41 extern int rgw_bucket_delete_bucket_obj(RGWRados *store,
42 const string& tenant_name,
43 const string& bucket_name,
44 RGWObjVersionTracker& objv_tracker);
46 extern int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info);
47 extern int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name);
49 extern std::string rgw_make_bucket_entry_name(const std::string& tenant_name,
50 const std::string& bucket_name);
51 static inline void rgw_make_bucket_entry_name(const string& tenant_name,
52 const string& bucket_name,
53 std::string& bucket_entry) {
54 bucket_entry = rgw_make_bucket_entry_name(tenant_name, bucket_name);
57 extern void rgw_parse_url_bucket(const string& bucket,
58 const string& auth_tenant,
59 string &tenant_name, string &bucket_name);
61 struct RGWBucketCompleteInfo {
63 map<string, bufferlist> attrs;
65 void dump(Formatter *f) const;
66 void decode_json(JSONObj *obj);
69 class RGWBucketEntryMetadataObject : public RGWMetadataObject {
70 RGWBucketEntryPoint ep;
72 RGWBucketEntryMetadataObject(RGWBucketEntryPoint& _ep, obj_version& v, real_time m) : ep(_ep) {
77 void dump(Formatter *f) const override {
82 class RGWBucketInstanceMetadataObject : public RGWMetadataObject {
83 RGWBucketCompleteInfo info;
85 RGWBucketInstanceMetadataObject() {}
86 RGWBucketInstanceMetadataObject(RGWBucketCompleteInfo& i, obj_version& v, real_time m) : info(i) {
91 void dump(Formatter *f) const override {
95 void decode_json(JSONObj *obj) {
96 info.decode_json(obj);
99 RGWBucketInfo& get_bucket_info() { return info.info; }
103 * Store a list of the user's buckets, with associated functinos.
107 std::map<std::string, RGWBucketEnt> buckets;
110 RGWUserBuckets() = default;
111 RGWUserBuckets(RGWUserBuckets&&) = default;
113 RGWUserBuckets& operator=(const RGWUserBuckets&) = default;
115 void encode(bufferlist& bl) const {
116 ::encode(buckets, bl);
118 void decode(bufferlist::iterator& bl) {
119 ::decode(buckets, bl);
122 * Check if the user owns a bucket by the given name.
124 bool owns(string& name) {
125 map<string, RGWBucketEnt>::iterator iter;
126 iter = buckets.find(name);
127 return (iter != buckets.end());
131 * Add a (created) bucket to the user's bucket list.
133 void add(const RGWBucketEnt& bucket) {
134 buckets[bucket.bucket.name] = bucket;
138 * Remove a bucket from the user's list by name.
140 void remove(string& name) {
141 map<string, RGWBucketEnt>::iterator iter;
142 iter = buckets.find(name);
143 if (iter != buckets.end()) {
149 * Get the user's buckets as a map.
151 map<string, RGWBucketEnt>& get_buckets() { return buckets; }
154 * Cleanup data structure
156 void clear() { buckets.clear(); }
158 size_t count() { return buckets.size(); }
160 WRITE_CLASS_ENCODER(RGWUserBuckets)
162 class RGWMetadataManager;
164 extern void rgw_bucket_init(RGWMetadataManager *mm);
166 * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
167 * Returns: 0 on success, -ERR# on failure.
169 extern int rgw_read_user_buckets(RGWRados *store,
170 const rgw_user& user_id,
171 RGWUserBuckets& buckets,
172 const string& marker,
173 const string& end_marker,
177 uint64_t default_amount = 1000);
179 extern int rgw_link_bucket(RGWRados* store,
180 const rgw_user& user_id,
182 ceph::real_time creation_time,
183 bool update_entrypoint = true);
184 extern int rgw_unlink_bucket(RGWRados *store, const rgw_user& user_id,
185 const string& tenant_name, const string& bucket_name, bool update_entrypoint = true);
187 extern int rgw_remove_object(RGWRados *store, RGWBucketInfo& bucket_info, rgw_bucket& bucket, rgw_obj_key& key);
188 extern int rgw_remove_bucket(RGWRados *store, rgw_bucket& bucket, bool delete_children);
189 extern int rgw_remove_bucket_bypass_gc(RGWRados *store, rgw_bucket& bucket, int concurrent_max);
191 extern int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info,
192 map<string, bufferlist>& attrs,
193 RGWObjVersionTracker *objv_tracker);
195 extern void check_bad_user_bucket_mapping(RGWRados *store, const rgw_user& user_id, bool fix);
197 struct RGWBucketAdminOpState {
199 std::string display_name;
200 std::string bucket_name;
201 std::string bucket_id;
202 std::string object_name;
208 bool delete_child_objects;
214 void set_fetch_stats(bool value) { stat_buckets = value; }
215 void set_check_objects(bool value) { check_objects = value; }
216 void set_fix_index(bool value) { fix_index = value; }
217 void set_delete_children(bool value) { delete_child_objects = value; }
219 void set_max_aio(int value) { max_aio = value; }
221 void set_user_id(const rgw_user& user_id) {
222 if (!user_id.empty())
225 void set_bucket_name(const std::string& bucket_str) {
226 bucket_name = bucket_str;
228 void set_object(std::string& object_str) {
229 object_name = object_str;
232 rgw_user& get_user_id() { return uid; }
233 std::string& get_user_display_name() { return display_name; }
234 std::string& get_bucket_name() { return bucket_name; }
235 std::string& get_object_name() { return object_name; }
237 rgw_bucket& get_bucket() { return bucket; }
238 void set_bucket(rgw_bucket& _bucket) {
240 bucket_stored = true;
243 void set_bucket_id(const string& bi) {
246 const string& get_bucket_id() { return bucket_id; }
248 bool will_fetch_stats() { return stat_buckets; }
249 bool will_fix_index() { return fix_index; }
250 bool will_delete_children() { return delete_child_objects; }
251 bool will_check_objects() { return check_objects; }
252 bool is_user_op() { return !uid.empty(); }
253 bool is_system_op() { return uid.empty(); }
254 bool has_bucket_stored() { return bucket_stored; }
255 int get_max_aio() { return max_aio; }
257 RGWBucketAdminOpState() : list_buckets(false), stat_buckets(false), check_objects(false),
258 fix_index(false), delete_child_objects(false),
259 bucket_stored(false) {}
263 * A simple wrapper class for administrative bucket operations
268 RGWUserBuckets buckets;
270 RGWAccessHandle handle;
272 RGWUserInfo user_info;
274 std::string bucket_name;
278 RGWBucketInfo bucket_info;
281 RGWBucket() : store(NULL), handle(NULL), failure(false) {}
282 int init(RGWRados *storage, RGWBucketAdminOpState& op_state);
284 int check_bad_index_multipart(RGWBucketAdminOpState& op_state,
285 RGWFormatterFlusher& flusher, std::string *err_msg = NULL);
287 int check_object_index(RGWBucketAdminOpState& op_state,
288 RGWFormatterFlusher& flusher,
289 std::string *err_msg = NULL);
291 int check_index(RGWBucketAdminOpState& op_state,
292 map<RGWObjCategory, RGWStorageStats>& existing_stats,
293 map<RGWObjCategory, RGWStorageStats>& calculated_stats,
294 std::string *err_msg = NULL);
296 int remove(RGWBucketAdminOpState& op_state, bool bypass_gc = false, bool keep_index_consistent = true, std::string *err_msg = NULL);
297 int link(RGWBucketAdminOpState& op_state, std::string *err_msg = NULL);
298 int unlink(RGWBucketAdminOpState& op_state, std::string *err_msg = NULL);
300 int remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg = NULL);
301 int policy_bl_to_stream(bufferlist& bl, ostream& o);
302 int get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy);
304 void clear_failure() { failure = false; }
307 class RGWBucketAdminOp
310 static int get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
311 RGWFormatterFlusher& flusher);
312 static int get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
313 RGWAccessControlPolicy& policy);
314 static int dump_s3_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
317 static int unlink(RGWRados *store, RGWBucketAdminOpState& op_state);
318 static int link(RGWRados *store, RGWBucketAdminOpState& op_state, string *err_msg = NULL);
320 static int check_index(RGWRados *store, RGWBucketAdminOpState& op_state,
321 RGWFormatterFlusher& flusher);
323 static int remove_bucket(RGWRados *store, RGWBucketAdminOpState& op_state, bool bypass_gc = false, bool keep_index_consistent = true);
324 static int remove_object(RGWRados *store, RGWBucketAdminOpState& op_state);
325 static int info(RGWRados *store, RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher);
326 static int limit_check(RGWRados *store, RGWBucketAdminOpState& op_state,
327 const std::list<std::string>& user_ids,
328 RGWFormatterFlusher& flusher,
329 bool warnings_only = false);
333 enum DataLogEntityType {
334 ENTITY_TYPE_UNKNOWN = 0,
335 ENTITY_TYPE_BUCKET = 1,
338 struct rgw_data_change {
339 DataLogEntityType entity_type;
343 void encode(bufferlist& bl) const {
344 ENCODE_START(1, 1, bl);
345 uint8_t t = (uint8_t)entity_type;
348 ::encode(timestamp, bl);
352 void decode(bufferlist::iterator& bl) {
356 entity_type = (DataLogEntityType)t;
358 ::decode(timestamp, bl);
362 void dump(Formatter *f) const;
363 void decode_json(JSONObj *obj);
365 WRITE_CLASS_ENCODER(rgw_data_change)
367 struct rgw_data_change_log_entry {
369 real_time log_timestamp;
370 rgw_data_change entry;
372 void encode(bufferlist& bl) const {
373 ENCODE_START(1, 1, bl);
374 ::encode(log_id, bl);
375 ::encode(log_timestamp, bl);
380 void decode(bufferlist::iterator& bl) {
382 ::decode(log_id, bl);
383 ::decode(log_timestamp, bl);
388 void dump(Formatter *f) const;
389 void decode_json(JSONObj *obj);
391 WRITE_CLASS_ENCODER(rgw_data_change_log_entry)
393 struct RGWDataChangesLogInfo {
395 real_time last_update;
397 void dump(Formatter *f) const;
398 void decode_json(JSONObj *obj);
401 class RGWDataChangesLog {
409 RWLock modified_lock;
410 map<int, set<string> > modified_shards;
412 std::atomic<bool> down_flag = { false };
414 struct ChangeStatus {
415 real_time cur_expiration;
418 RefCountedCond *cond;
421 ChangeStatus() : pending(false), cond(NULL) {
422 lock = new Mutex("RGWDataChangesLog::ChangeStatus");
430 typedef ceph::shared_ptr<ChangeStatus> ChangeStatusPtr;
432 lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
434 map<rgw_bucket_shard, bool> cur_cycle;
436 void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
437 void register_renew(rgw_bucket_shard& bs);
438 void update_renewed(rgw_bucket_shard& bs, real_time& expiration);
440 class ChangesRenewThread : public Thread {
442 RGWDataChangesLog *log;
447 ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log), lock("ChangesRenewThread::lock") {}
448 void *entry() override;
452 ChangesRenewThread *renew_thread;
456 RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store),
457 lock("RGWDataChangesLog::lock"), modified_lock("RGWDataChangesLog::modified_lock"),
458 changes(cct->_conf->rgw_data_log_changes_size) {
459 num_shards = cct->_conf->rgw_data_log_num_shards;
461 oids = new string[num_shards];
463 string prefix = cct->_conf->rgw_data_log_obj_prefix;
465 if (prefix.empty()) {
469 for (int i = 0; i < num_shards; i++) {
471 snprintf(buf, sizeof(buf), "%s.%d", prefix.c_str(), i);
475 renew_thread = new ChangesRenewThread(cct, this);
476 renew_thread->create("rgw_dt_lg_renew");
479 ~RGWDataChangesLog();
481 int choose_oid(const rgw_bucket_shard& bs);
482 const std::string& get_oid(int shard_id) const { return oids[shard_id]; }
483 int add_entry(rgw_bucket& bucket, int shard_id);
484 int get_log_shard_id(rgw_bucket& bucket, int shard_id);
486 int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
487 list<rgw_data_change_log_entry>& entries,
488 const string& marker,
491 int trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
492 const string& start_marker, const string& end_marker);
493 int trim_entries(const real_time& start_time, const real_time& end_time,
494 const string& start_marker, const string& end_marker);
495 int get_info(int shard_id, RGWDataChangesLogInfo *info);
496 int lock_exclusive(int shard_id, timespan duration, string& zone_id, string& owner_id) {
497 return store->lock_exclusive(store->get_zone_params().log_pool, oids[shard_id], duration, zone_id, owner_id);
499 int unlock(int shard_id, string& zone_id, string& owner_id) {
500 return store->unlock(store->get_zone_params().log_pool, oids[shard_id], zone_id, owner_id);
506 LogMarker() : shard(0) {}
508 int list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
509 list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated);
511 void mark_modified(int shard_id, const rgw_bucket_shard& bs);
512 void read_clear_modified(map<int, set<string> > &modified);