initial code repo
[stor4nfv.git] / src / ceph / src / rgw / rgw_reshard.h
diff --git a/src/ceph/src/rgw/rgw_reshard.h b/src/ceph/src/rgw/rgw_reshard.h
new file mode 100644 (file)
index 0000000..8c1734b
--- /dev/null
@@ -0,0 +1,127 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RGW_RESHARD_H
+#define RGW_RESHARD_H
+
+#include <vector>
+#include "include/rados/librados.hpp"
+#include "cls/rgw/cls_rgw_types.h"
+#include "cls/lock/cls_lock_client.h"
+#include "rgw_bucket.h"
+
+class CephContext;
+class RGWRados;
+
+
+class RGWBucketReshard {
+  friend class RGWReshard;
+
+  RGWRados *store;
+  RGWBucketInfo bucket_info;
+  std::map<string, bufferlist> bucket_attrs;
+
+  string reshard_oid;
+  rados::cls::lock::Lock reshard_lock;
+
+  int lock_bucket();
+  void unlock_bucket();
+  int set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status);
+  int clear_resharding();
+
+  int create_new_bucket_instance(int new_num_shards, RGWBucketInfo& new_bucket_info);
+  int do_reshard(int num_shards,
+                const RGWBucketInfo& new_bucket_info,
+                int max_entries,
+                 bool verbose,
+                 ostream *os,
+                Formatter *formatter);
+public:
+  RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
+                   const std::map<string, bufferlist>& _bucket_attrs);
+
+  int execute(int num_shards, int max_op_entries,
+              bool verbose = false, ostream *out = nullptr,
+              Formatter *formatter = nullptr,
+             RGWReshard *reshard_log = nullptr);
+  int abort();
+  int get_status(std::list<cls_rgw_bucket_instance_entry> *status);
+};
+
+class RGWReshard {
+    RGWRados *store;
+    string lock_name;
+    rados::cls::lock::Lock instance_lock;
+    int num_logshards;
+
+    bool verbose;
+    ostream *out;
+    Formatter *formatter;
+
+    void get_logshard_oid(int shard_num, string *shard);
+protected:
+  class ReshardWorker : public Thread {
+    CephContext *cct;
+    RGWReshard *reshard;
+    Mutex lock;
+    Cond cond;
+
+  public:
+    ReshardWorker(CephContext * const _cct,
+                             RGWReshard * const _reshard)
+      : cct(_cct),
+        reshard(_reshard),
+        lock("ReshardWorker") {
+    }
+
+    void *entry() override;
+    void stop();
+  };
+
+  ReshardWorker *worker = nullptr;
+  std::atomic<bool> down_flag = { false };
+
+  string get_logshard_key(const string& tenant, const string& bucket_name);
+  void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid);
+
+public:
+  RGWReshard(RGWRados* _store, bool _verbose = false, ostream *_out = nullptr, Formatter *_formatter = nullptr);
+  int add(cls_rgw_reshard_entry& entry);
+  int update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info);
+  int get(cls_rgw_reshard_entry& entry);
+  int remove(cls_rgw_reshard_entry& entry);
+  int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
+  int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
+
+  /* reshard thread */
+  int process_single_logshard(int logshard_num);
+  int process_all_logshards();
+  bool going_down();
+  void start_processor();
+  void stop_processor();
+};
+
+
+class RGWReshardWait {
+  RGWRados *store;
+  Mutex lock{"RGWReshardWait::lock"};
+  Cond cond;
+
+  bool going_down{false};
+
+  int do_wait();
+public:
+  RGWReshardWait(RGWRados *_store) : store(_store) {}
+  ~RGWReshardWait() {
+    assert(going_down);
+  }
+  int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
+
+  void stop() {
+    Mutex::Locker l(lock);
+    going_down = true;
+    cond.SignalAll();
+  }
+};
+
+#endif