initial code repo
[stor4nfv.git] / src / ceph / src / rgw / rgw_cr_rados.h
diff --git a/src/ceph/src/rgw/rgw_cr_rados.h b/src/ceph/src/rgw/rgw_cr_rados.h
new file mode 100644 (file)
index 0000000..a82bb35
--- /dev/null
@@ -0,0 +1,1157 @@
+#ifndef CEPH_RGW_CR_RADOS_H
+#define CEPH_RGW_CR_RADOS_H
+
+#include <boost/intrusive_ptr.hpp>
+#include "include/assert.h"
+#include "rgw_coroutine.h"
+#include "rgw_rados.h"
+#include "common/WorkQueue.h"
+#include "common/Throttle.h"
+
+#include <atomic>
+
+class RGWAsyncRadosRequest : public RefCountedObject {
+  RGWCoroutine *caller;
+  RGWAioCompletionNotifier *notifier;
+
+  int retcode;
+
+  Mutex lock;
+
+protected:
+  virtual int _send_request() = 0;
+public:
+  RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
+                                                                               lock("RGWAsyncRadosRequest::lock") {
+  }
+  ~RGWAsyncRadosRequest() override {
+    if (notifier) {
+      notifier->put();
+    }
+  }
+
+  void send_request() {
+    get();
+    retcode = _send_request();
+    {
+      Mutex::Locker l(lock);
+      if (notifier) {
+        notifier->cb(); // drops its own ref
+        notifier = nullptr;
+      }
+    }
+    put();
+  }
+
+  int get_ret_status() { return retcode; }
+
+  void finish() {
+    {
+      Mutex::Locker l(lock);
+      if (notifier) {
+        // we won't call notifier->cb() to drop its ref, so drop it here
+        notifier->put();
+        notifier = nullptr;
+      }
+    }
+    put();
+  }
+};
+
+
+class RGWAsyncRadosProcessor {
+  deque<RGWAsyncRadosRequest *> m_req_queue;
+  std::atomic<bool> going_down = { false };
+protected:
+  RGWRados *store;
+  ThreadPool m_tp;
+  Throttle req_throttle;
+
+  struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
+    RGWAsyncRadosProcessor *processor;
+    RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+      : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
+
+    bool _enqueue(RGWAsyncRadosRequest *req) override;
+    void _dequeue(RGWAsyncRadosRequest *req) override {
+      ceph_abort();
+    }
+    bool _empty() override;
+    RGWAsyncRadosRequest *_dequeue() override;
+    using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
+    void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
+    void _dump_queue();
+    void _clear() override {
+      assert(processor->m_req_queue.empty());
+    }
+  } req_wq;
+
+public:
+  RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
+  ~RGWAsyncRadosProcessor() {}
+  void start();
+  void stop();
+  void handle_request(RGWAsyncRadosRequest *req);
+  void queue(RGWAsyncRadosRequest *req);
+
+  bool is_going_down() {
+    return going_down;
+  }
+};
+
+
+class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  RGWObjectCtx *obj_ctx;
+  RGWRados::SystemObject::Read::GetObjState read_state;
+  RGWObjVersionTracker *objv_tracker;
+  rgw_raw_obj obj;
+  bufferlist *pbl;
+  map<string, bufferlist> *pattrs;
+  off_t ofs;
+  off_t end;
+protected:
+  int _send_request() override;
+public:
+  RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
+                       RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+                       bufferlist *_pbl, off_t _ofs, off_t _end);
+  void set_read_attrs(map<string, bufferlist> *_pattrs) { pattrs = _pattrs; }
+};
+
+class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  RGWObjVersionTracker *objv_tracker;
+  rgw_raw_obj obj;
+  bool exclusive;
+  bufferlist bl;
+
+protected:
+  int _send_request() override;
+public:
+  RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+                       RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
+                       bool _exclusive, bufferlist& _bl);
+};
+
+class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  RGWObjVersionTracker *objv_tracker;
+  rgw_raw_obj obj;
+  map<string, bufferlist> *attrs;
+
+protected:
+  int _send_request() override;
+public:
+  RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+                       RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+                       map<string, bufferlist> *_attrs);
+};
+
+class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  rgw_raw_obj obj;
+  string lock_name;
+  string cookie;
+  uint32_t duration_secs;
+
+protected:
+  int _send_request() override;
+public:
+  RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+                        RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+                       const string& _name, const string& _cookie, uint32_t _duration_secs);
+};
+
+class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  rgw_raw_obj obj;
+  string lock_name;
+  string cookie;
+
+protected:
+  int _send_request() override;
+public:
+  RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+                        RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+                       const string& _name, const string& _cookie);
+};
+
+template <class T>
+class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  RGWObjectCtx obj_ctx;
+  bufferlist bl;
+
+  rgw_raw_obj obj;
+
+  map<string, bufferlist> *pattrs{nullptr};
+
+  T *result;
+  /// on ENOENT, call handle_data() with an empty object instead of failing
+  const bool empty_on_enoent;
+  RGWObjVersionTracker *objv_tracker;
+
+  RGWAsyncGetSystemObj *req{nullptr};
+
+public:
+  RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                     const rgw_raw_obj& _obj,
+                     T *_result, bool empty_on_enoent = true,
+                     RGWObjVersionTracker *objv_tracker = nullptr)
+    : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
+      obj_ctx(store), obj(_obj), result(_result),
+      empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
+  ~RGWSimpleRadosReadCR() override {
+    request_cleanup();
+  }
+
+  void request_cleanup() override {
+    if (req) {
+      req->finish();
+      req = NULL;
+    }
+  }
+
+  int send_request() override;
+  int request_complete() override;
+
+  virtual int handle_data(T& data) {
+    return 0;
+  }
+};
+
+template <class T>
+int RGWSimpleRadosReadCR<T>::send_request()
+{
+  req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
+                                store, &obj_ctx, objv_tracker,
+                                obj,
+                                &bl, 0, -1);
+  if (pattrs) {
+    req->set_read_attrs(pattrs);
+  }
+  async_rados->queue(req);
+  return 0;
+}
+
+template <class T>
+int RGWSimpleRadosReadCR<T>::request_complete()
+{
+  int ret = req->get_ret_status();
+  retcode = ret;
+  if (ret == -ENOENT && empty_on_enoent) {
+    *result = T();
+  } else {
+    if (ret < 0) {
+      return ret;
+    }
+    try {
+      bufferlist::iterator iter = bl.begin();
+      if (iter.end()) {
+        // allow successful reads with empty buffers. ReadSyncStatus coroutines
+        // depend on this to be able to read without locking, because the
+        // cls lock from InitSyncStatus will create an empty object if it didnt
+        // exist
+        *result = T();
+      } else {
+        ::decode(*result, iter);
+      }
+    } catch (buffer::error& err) {
+      return -EIO;
+    }
+  }
+
+  return handle_data(*result);
+}
+
+class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  RGWObjectCtx obj_ctx;
+  bufferlist bl;
+
+  rgw_raw_obj obj;
+
+  map<string, bufferlist> *pattrs;
+
+  RGWAsyncGetSystemObj *req;
+
+public:
+  RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                     const rgw_raw_obj& _obj,
+                     map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
+                                                async_rados(_async_rados), store(_store),
+                                                obj_ctx(store),
+                                               obj(_obj),
+                                                pattrs(_pattrs),
+                                                req(NULL) { }
+  ~RGWSimpleRadosReadAttrsCR() override {
+    request_cleanup();
+  }
+                                                         
+  void request_cleanup() override {
+    if (req) {
+      req->finish();
+      req = NULL;
+    }
+  }
+
+  int send_request() override;
+  int request_complete() override;
+};
+
+template <class T>
+class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  bufferlist bl;
+
+  rgw_raw_obj obj;
+  RGWObjVersionTracker *objv_tracker;
+
+  RGWAsyncPutSystemObj *req{nullptr};
+
+public:
+  RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                     const rgw_raw_obj& _obj,
+                     const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
+    : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
+      store(_store), obj(_obj), objv_tracker(objv_tracker) {
+    ::encode(_data, bl);
+  }
+
+  ~RGWSimpleRadosWriteCR() override {
+    request_cleanup();
+  }
+
+  void request_cleanup() override {
+    if (req) {
+      req->finish();
+      req = NULL;
+    }
+  }
+
+  int send_request() override {
+    req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
+                                  store, objv_tracker, obj, false, bl);
+    async_rados->queue(req);
+    return 0;
+  }
+
+  int request_complete() override {
+    return req->get_ret_status();
+  }
+};
+
+class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+
+  rgw_raw_obj obj;
+
+  map<string, bufferlist> attrs;
+
+  RGWAsyncPutSystemObjAttrs *req;
+
+public:
+  RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                     const rgw_raw_obj& _obj,
+                     map<string, bufferlist>& _attrs) : RGWSimpleCoroutine(_store->ctx()),
+                                                async_rados(_async_rados),
+                                               store(_store),
+                                               obj(_obj),
+                                                attrs(_attrs), req(NULL) {
+  }
+  ~RGWSimpleRadosWriteAttrsCR() override {
+    request_cleanup();
+  }
+
+  void request_cleanup() override {
+    if (req) {
+      req->finish();
+      req = NULL;
+    }
+  }
+
+  int send_request() override {
+    req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
+                                  store, NULL, obj, &attrs);
+    async_rados->queue(req);
+    return 0;
+  }
+
+  int request_complete() override {
+    return req->get_ret_status();
+  }
+};
+
+class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
+  RGWRados *store;
+  map<string, bufferlist> entries;
+
+  rgw_rados_ref ref;
+
+  rgw_raw_obj obj;
+
+  boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
+public:
+  RGWRadosSetOmapKeysCR(RGWRados *_store,
+                     const rgw_raw_obj& _obj,
+                     map<string, bufferlist>& _entries);
+
+  int send_request() override;
+  int request_complete() override;
+};
+
+class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
+  RGWRados *store;
+
+  string marker;
+  map<string, bufferlist> *entries;
+  int max_entries;
+
+  int rval;
+  rgw_rados_ref ref;
+
+  rgw_raw_obj obj;
+
+  boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
+public:
+  RGWRadosGetOmapKeysCR(RGWRados *_store,
+                     const rgw_raw_obj& _obj,
+                     const string& _marker,
+                     map<string, bufferlist> *_entries, int _max_entries);
+
+  int send_request() override;
+
+  int request_complete() override {
+    return rval;
+  }
+};
+
+class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
+  RGWRados *store;
+
+  rgw_rados_ref ref;
+
+  set<string> keys;
+
+  rgw_raw_obj obj;
+
+  boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
+public:
+  RGWRadosRemoveOmapKeysCR(RGWRados *_store,
+                     const rgw_raw_obj& _obj,
+                     const set<string>& _keys);
+
+  int send_request() override;
+
+  int request_complete() override;
+};
+
+class RGWRadosRemoveCR : public RGWSimpleCoroutine {
+  RGWRados *store;
+  librados::IoCtx ioctx;
+  const rgw_raw_obj obj;
+  boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
+public:
+  RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
+
+  int send_request();
+  int request_complete();
+};
+
+class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  string lock_name;
+  string cookie;
+  uint32_t duration;
+
+  rgw_raw_obj obj;
+
+  RGWAsyncLockSystemObj *req;
+
+public:
+  RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                     const rgw_raw_obj& _obj,
+                      const string& _lock_name,
+                     const string& _cookie,
+                     uint32_t _duration);
+  ~RGWSimpleRadosLockCR() override {
+    request_cleanup();
+  }
+  void request_cleanup() override;
+
+  int send_request() override;
+  int request_complete() override;
+
+  static std::string gen_random_cookie(CephContext* cct) {
+#define COOKIE_LEN 16
+    char buf[COOKIE_LEN + 1];
+    gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
+    return buf;
+  }
+};
+
+class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  string lock_name;
+  string cookie;
+
+  rgw_raw_obj obj;
+
+  RGWAsyncUnlockSystemObj *req;
+
+public:
+  RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                     const rgw_raw_obj& _obj, 
+                      const string& _lock_name,
+                     const string& _cookie);
+  ~RGWSimpleRadosUnlockCR() override {
+    request_cleanup();
+  }
+  void request_cleanup() override;
+
+  int send_request() override;
+  int request_complete() override;
+};
+
+#define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
+
+class RGWOmapAppend : public RGWConsumerCR<string> {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+
+  rgw_raw_obj obj;
+
+  bool going_down;
+
+  int num_pending_entries;
+  list<string> pending_entries;
+
+  map<string, bufferlist> entries;
+
+  uint64_t window_size;
+  uint64_t total_entries;
+public:
+  RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                const rgw_raw_obj& _obj,
+                uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
+  int operate() override;
+  void flush_pending();
+  bool append(const string& s);
+  bool finish();
+
+  uint64_t get_total_entries() {
+    return total_entries;
+  }
+
+  const rgw_raw_obj& get_obj() {
+    return obj;
+  }
+};
+
+class RGWAsyncWait : public RGWAsyncRadosRequest {
+  CephContext *cct;
+  Mutex *lock;
+  Cond *cond;
+  utime_t interval;
+protected:
+  int _send_request() override {
+    Mutex::Locker l(*lock);
+    return cond->WaitInterval(*lock, interval);
+  }
+public:
+  RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
+               Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
+                                       cct(_cct),
+                                       lock(_lock), cond(_cond), interval(_secs, 0) {}
+
+  void wakeup() {
+    Mutex::Locker l(*lock);
+    cond->Signal();
+  }
+};
+
+class RGWWaitCR : public RGWSimpleCoroutine {
+  CephContext *cct;
+  RGWAsyncRadosProcessor *async_rados;
+  Mutex *lock;
+  Cond *cond;
+  int secs;
+
+  RGWAsyncWait *req;
+
+public:
+  RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
+           Mutex *_lock, Cond *_cond,
+            int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
+                         async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
+  }
+  ~RGWWaitCR() override {
+    request_cleanup();
+  }
+
+  void request_cleanup() override {
+    if (req) {
+      wakeup();
+      req->finish();
+      req = NULL;
+    }
+  }
+
+  int send_request() override {
+    req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct,  lock, cond, secs);
+    async_rados->queue(req);
+    return 0;
+  }
+
+  int request_complete() override {
+    return req->get_ret_status();
+  }
+
+  void wakeup() {
+    req->wakeup();
+  }
+};
+
+class RGWShardedOmapCRManager {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  RGWCoroutine *op;
+
+  int num_shards;
+
+  vector<RGWOmapAppend *> shards;
+public:
+  RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
+                      : async_rados(_async_rados),
+                       store(_store), op(_op), num_shards(_num_shards) {
+    shards.reserve(num_shards);
+    for (int i = 0; i < num_shards; ++i) {
+      char buf[oid_prefix.size() + 16];
+      snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
+      RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
+      shard->get();
+      shards.push_back(shard);
+      op->spawn(shard, false);
+    }
+  }
+
+  ~RGWShardedOmapCRManager() {
+    for (auto shard : shards) {
+      shard->put();
+    }
+  }
+
+  bool append(const string& entry, int shard_id) {
+    return shards[shard_id]->append(entry);
+  }
+  bool finish() {
+    bool success = true;
+    for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
+      success &= ((*iter)->finish() && (!(*iter)->is_error()));
+    }
+    return success;
+  }
+
+  uint64_t get_total_entries(int shard_id) {
+    return shards[shard_id]->get_total_entries();
+  }
+};
+
+class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  rgw_bucket bucket;
+  RGWBucketInfo *bucket_info;
+
+protected:
+  int _send_request() override;
+public:
+  RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
+                                RGWRados *_store, const rgw_bucket& bucket,
+                                RGWBucketInfo *_bucket_info)
+    : RGWAsyncRadosRequest(caller, cn), store(_store),
+      bucket(bucket), bucket_info(_bucket_info) {}
+};
+
+class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  rgw_bucket bucket;
+  RGWBucketInfo *bucket_info;
+
+  RGWAsyncGetBucketInstanceInfo *req;
+  
+public:
+  RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                             const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
+    : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
+      bucket(bucket), bucket_info(_bucket_info), req(NULL) {}
+  ~RGWGetBucketInstanceInfoCR() override {
+    request_cleanup();
+  }
+  void request_cleanup() override {
+    if (req) {
+      req->finish();
+      req = NULL;
+    }
+  }
+
+  int send_request() override {
+    req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket, bucket_info);
+    async_rados->queue(req);
+    return 0;
+  }
+  int request_complete() override {
+    return req->get_ret_status();
+  }
+};
+
+class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  string source_zone;
+
+  RGWBucketInfo bucket_info;
+
+  rgw_obj_key key;
+  uint64_t versioned_epoch;
+
+  real_time src_mtime;
+
+  bool copy_if_newer;
+  rgw_zone_set *zones_trace;
+
+protected:
+  int _send_request() override;
+public:
+  RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+                         const string& _source_zone,
+                         RGWBucketInfo& _bucket_info,
+                         const rgw_obj_key& _key,
+                         uint64_t _versioned_epoch,
+                         bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
+                                                      source_zone(_source_zone),
+                                                      bucket_info(_bucket_info),
+                                                      key(_key),
+                                                      versioned_epoch(_versioned_epoch),
+                                                      copy_if_newer(_if_newer), zones_trace(_zones_trace) {}
+};
+
+class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
+  CephContext *cct;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  string source_zone;
+
+  RGWBucketInfo bucket_info;
+
+  rgw_obj_key key;
+  uint64_t versioned_epoch;
+
+  real_time src_mtime;
+
+  bool copy_if_newer;
+
+  RGWAsyncFetchRemoteObj *req;
+  rgw_zone_set *zones_trace;
+
+public:
+  RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                      const string& _source_zone,
+                      RGWBucketInfo& _bucket_info,
+                      const rgw_obj_key& _key,
+                      uint64_t _versioned_epoch,
+                      bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+                                       async_rados(_async_rados), store(_store),
+                                       source_zone(_source_zone),
+                                       bucket_info(_bucket_info),
+                                       key(_key),
+                                       versioned_epoch(_versioned_epoch),
+                                       copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
+
+
+  ~RGWFetchRemoteObjCR() override {
+    request_cleanup();
+  }
+
+  void request_cleanup() override {
+    if (req) {
+      req->finish();
+      req = NULL;
+    }
+  }
+
+  int send_request() override {
+    req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
+                                     key, versioned_epoch, copy_if_newer, zones_trace);
+    async_rados->queue(req);
+    return 0;
+  }
+
+  int request_complete() override {
+    return req->get_ret_status();
+  }
+};
+
+class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  string source_zone;
+
+  RGWBucketInfo bucket_info;
+
+  rgw_obj_key key;
+
+  ceph::real_time *pmtime;
+  uint64_t *psize;
+  map<string, bufferlist> *pattrs;
+
+protected:
+  int _send_request() override;
+public:
+  RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+                         const string& _source_zone,
+                         RGWBucketInfo& _bucket_info,
+                         const rgw_obj_key& _key,
+                         ceph::real_time *_pmtime,
+                         uint64_t *_psize,
+                         map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
+                                                      source_zone(_source_zone),
+                                                      bucket_info(_bucket_info),
+                                                      key(_key),
+                                                      pmtime(_pmtime),
+                                                      psize(_psize),
+                                                      pattrs(_pattrs) {}
+};
+
+class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
+  CephContext *cct;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  string source_zone;
+
+  RGWBucketInfo bucket_info;
+
+  rgw_obj_key key;
+
+  ceph::real_time *pmtime;
+  uint64_t *psize;
+  map<string, bufferlist> *pattrs;
+
+  RGWAsyncStatRemoteObj *req;
+
+public:
+  RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                      const string& _source_zone,
+                      RGWBucketInfo& _bucket_info,
+                      const rgw_obj_key& _key,
+                      ceph::real_time *_pmtime,
+                      uint64_t *_psize,
+                      map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+                                       async_rados(_async_rados), store(_store),
+                                       source_zone(_source_zone),
+                                       bucket_info(_bucket_info),
+                                       key(_key),
+                                       pmtime(_pmtime),
+                                       psize(_psize),
+                                       pattrs(_pattrs),
+                                       req(NULL) {}
+
+
+  ~RGWStatRemoteObjCR() override {
+    request_cleanup();
+  }
+
+  void request_cleanup() override {
+    if (req) {
+      req->finish();
+      req = NULL;
+    }
+  }
+
+  int send_request() override {
+    req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
+                                    bucket_info, key, pmtime, psize, pattrs);
+    async_rados->queue(req);
+    return 0;
+  }
+
+  int request_complete() override {
+    return req->get_ret_status();
+  }
+};
+
+class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  string source_zone;
+
+  RGWBucketInfo bucket_info;
+
+  rgw_obj_key key;
+  string owner;
+  string owner_display_name;
+  bool versioned;
+  uint64_t versioned_epoch;
+  string marker_version_id;
+
+  bool del_if_older;
+  ceph::real_time timestamp;
+  rgw_zone_set *zones_trace;
+
+protected:
+  int _send_request() override;
+public:
+  RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+                         const string& _source_zone,
+                         RGWBucketInfo& _bucket_info,
+                         const rgw_obj_key& _key,
+                         const string& _owner,
+                         const string& _owner_display_name,
+                         bool _versioned,
+                         uint64_t _versioned_epoch,
+                         bool _delete_marker,
+                         bool _if_older,
+                         real_time& _timestamp,
+                         rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
+                                                      source_zone(_source_zone),
+                                                      bucket_info(_bucket_info),
+                                                      key(_key),
+                                                      owner(_owner),
+                                                      owner_display_name(_owner_display_name),
+                                                      versioned(_versioned),
+                                                      versioned_epoch(_versioned_epoch),
+                                                      del_if_older(_if_older),
+                                                      timestamp(_timestamp), zones_trace(_zones_trace) {
+    if (_delete_marker) {
+      marker_version_id = key.instance;
+    }
+  }
+};
+
+class RGWRemoveObjCR : public RGWSimpleCoroutine {
+  CephContext *cct;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  string source_zone;
+
+  RGWBucketInfo bucket_info;
+
+  rgw_obj_key key;
+  bool versioned;
+  uint64_t versioned_epoch;
+  bool delete_marker;
+  string owner;
+  string owner_display_name;
+
+  bool del_if_older;
+  real_time timestamp;
+
+  RGWAsyncRemoveObj *req;
+  
+  rgw_zone_set *zones_trace;
+
+public:
+  RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                      const string& _source_zone,
+                      RGWBucketInfo& _bucket_info,
+                      const rgw_obj_key& _key,
+                      bool _versioned,
+                      uint64_t _versioned_epoch,
+                      string *_owner,
+                      string *_owner_display_name,
+                      bool _delete_marker,
+                      real_time *_timestamp,
+                      rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+                                       async_rados(_async_rados), store(_store),
+                                       source_zone(_source_zone),
+                                       bucket_info(_bucket_info),
+                                       key(_key),
+                                       versioned(_versioned),
+                                       versioned_epoch(_versioned_epoch),
+                                       delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
+    del_if_older = (_timestamp != NULL);
+    if (_timestamp) {
+      timestamp = *_timestamp;
+    }
+
+    if (_owner) {
+      owner = *_owner;
+    }
+
+    if (_owner_display_name) {
+      owner_display_name = *_owner_display_name;
+    }
+  }
+  ~RGWRemoveObjCR() override {
+    request_cleanup();
+  }
+
+  void request_cleanup() override {
+    if (req) {
+      req->finish();
+      req = NULL;
+    }
+  }
+
+  int send_request() override {
+    req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
+                                key, owner, owner_display_name, versioned, versioned_epoch,
+                                delete_marker, del_if_older, timestamp, zones_trace);
+    async_rados->queue(req);
+    return 0;
+  }
+
+  int request_complete() override {
+    return req->get_ret_status();
+  }
+};
+
+class RGWContinuousLeaseCR : public RGWCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+
+  const rgw_raw_obj obj;
+
+  const string lock_name;
+  const string cookie;
+
+  int interval;
+
+  Mutex lock;
+  std::atomic<bool> going_down = { false };
+  bool locked{false};
+
+  RGWCoroutine *caller;
+
+  bool aborted{false};
+
+public:
+  RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                       const rgw_raw_obj& _obj,
+                       const string& _lock_name, int _interval, RGWCoroutine *_caller)
+    : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
+    obj(_obj), lock_name(_lock_name),
+    cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
+    interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
+  {}
+
+  int operate() override;
+
+  bool is_locked() {
+    Mutex::Locker l(lock);
+    return locked;
+  }
+
+  void set_locked(bool status) {
+    Mutex::Locker l(lock);
+    locked = status;
+  }
+
+  void go_down() {
+    going_down = true;
+    wakeup();
+  }
+
+  void abort() {
+    aborted = true;
+  }
+};
+
+class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
+  RGWRados *store;
+  list<cls_log_entry> entries;
+
+  string oid;
+
+  boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
+public:
+  RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
+                       const cls_log_entry& entry);
+
+  int send_request() override;
+  int request_complete() override;
+};
+
+class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
+  RGWRados *store;
+  boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+ protected:
+  std::string oid;
+  real_time start_time;
+  real_time end_time;
+  std::string from_marker;
+  std::string to_marker;
+
+ public:
+  RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
+                        const real_time& start_time, const real_time& end_time,
+                        const std::string& from_marker,
+                        const std::string& to_marker);
+
+  int send_request() override;
+  int request_complete() override;
+};
+
+// wrapper to update last_trim_marker on success
+class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
+  CephContext *cct;
+  std::string *last_trim_marker;
+ public:
+  RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
+                   const std::string& to_marker, std::string *last_trim_marker);
+  int request_complete() override;
+};
+
+class RGWAsyncStatObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  RGWBucketInfo bucket_info;
+  rgw_obj obj;
+  uint64_t *psize;
+  real_time *pmtime;
+  uint64_t *pepoch;
+  RGWObjVersionTracker *objv_tracker;
+protected:
+  int _send_request() override;
+public:
+  RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
+                  const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
+                  real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
+                  RGWObjVersionTracker *objv_tracker = nullptr)
+         : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
+         pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
+};
+
+class RGWStatObjCR : public RGWSimpleCoroutine {
+  RGWRados *store;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWBucketInfo bucket_info;
+  rgw_obj obj;
+  uint64_t *psize;
+  real_time *pmtime;
+  uint64_t *pepoch;
+  RGWObjVersionTracker *objv_tracker;
+  RGWAsyncStatObj *req = nullptr;
+ public:
+  RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
+         const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
+         real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
+         RGWObjVersionTracker *objv_tracker = nullptr);
+  ~RGWStatObjCR() override {
+    request_cleanup();
+  }
+  void request_cleanup() override;
+
+  int send_request() override;
+  int request_complete() override;
+};
+
+#endif