--- /dev/null
+#ifndef CEPH_RGW_CR_RADOS_H
+#define CEPH_RGW_CR_RADOS_H
+
+#include <boost/intrusive_ptr.hpp>
+#include "include/assert.h"
+#include "rgw_coroutine.h"
+#include "rgw_rados.h"
+#include "common/WorkQueue.h"
+#include "common/Throttle.h"
+
+#include <atomic>
+
+class RGWAsyncRadosRequest : public RefCountedObject {
+ RGWCoroutine *caller;
+ RGWAioCompletionNotifier *notifier;
+
+ int retcode;
+
+ Mutex lock;
+
+protected:
+ virtual int _send_request() = 0;
+public:
+ RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
+ lock("RGWAsyncRadosRequest::lock") {
+ }
+ ~RGWAsyncRadosRequest() override {
+ if (notifier) {
+ notifier->put();
+ }
+ }
+
+ void send_request() {
+ get();
+ retcode = _send_request();
+ {
+ Mutex::Locker l(lock);
+ if (notifier) {
+ notifier->cb(); // drops its own ref
+ notifier = nullptr;
+ }
+ }
+ put();
+ }
+
+ int get_ret_status() { return retcode; }
+
+ void finish() {
+ {
+ Mutex::Locker l(lock);
+ if (notifier) {
+ // we won't call notifier->cb() to drop its ref, so drop it here
+ notifier->put();
+ notifier = nullptr;
+ }
+ }
+ put();
+ }
+};
+
+
+class RGWAsyncRadosProcessor {
+ deque<RGWAsyncRadosRequest *> m_req_queue;
+ std::atomic<bool> going_down = { false };
+protected:
+ RGWRados *store;
+ ThreadPool m_tp;
+ Throttle req_throttle;
+
+ struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
+ RGWAsyncRadosProcessor *processor;
+ RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
+
+ bool _enqueue(RGWAsyncRadosRequest *req) override;
+ void _dequeue(RGWAsyncRadosRequest *req) override {
+ ceph_abort();
+ }
+ bool _empty() override;
+ RGWAsyncRadosRequest *_dequeue() override;
+ using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
+ void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
+ void _dump_queue();
+ void _clear() override {
+ assert(processor->m_req_queue.empty());
+ }
+ } req_wq;
+
+public:
+ RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
+ ~RGWAsyncRadosProcessor() {}
+ void start();
+ void stop();
+ void handle_request(RGWAsyncRadosRequest *req);
+ void queue(RGWAsyncRadosRequest *req);
+
+ bool is_going_down() {
+ return going_down;
+ }
+};
+
+
+class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ RGWObjectCtx *obj_ctx;
+ RGWRados::SystemObject::Read::GetObjState read_state;
+ RGWObjVersionTracker *objv_tracker;
+ rgw_raw_obj obj;
+ bufferlist *pbl;
+ map<string, bufferlist> *pattrs;
+ off_t ofs;
+ off_t end;
+protected:
+ int _send_request() override;
+public:
+ RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
+ RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+ bufferlist *_pbl, off_t _ofs, off_t _end);
+ void set_read_attrs(map<string, bufferlist> *_pattrs) { pattrs = _pattrs; }
+};
+
+class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ RGWObjVersionTracker *objv_tracker;
+ rgw_raw_obj obj;
+ bool exclusive;
+ bufferlist bl;
+
+protected:
+ int _send_request() override;
+public:
+ RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
+ bool _exclusive, bufferlist& _bl);
+};
+
+class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ RGWObjVersionTracker *objv_tracker;
+ rgw_raw_obj obj;
+ map<string, bufferlist> *attrs;
+
+protected:
+ int _send_request() override;
+public:
+ RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+ map<string, bufferlist> *_attrs);
+};
+
+class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ rgw_raw_obj obj;
+ string lock_name;
+ string cookie;
+ uint32_t duration_secs;
+
+protected:
+ int _send_request() override;
+public:
+ RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+ const string& _name, const string& _cookie, uint32_t _duration_secs);
+};
+
+class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ rgw_raw_obj obj;
+ string lock_name;
+ string cookie;
+
+protected:
+ int _send_request() override;
+public:
+ RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+ const string& _name, const string& _cookie);
+};
+
+template <class T>
+class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ RGWObjectCtx obj_ctx;
+ bufferlist bl;
+
+ rgw_raw_obj obj;
+
+ map<string, bufferlist> *pattrs{nullptr};
+
+ T *result;
+ /// on ENOENT, call handle_data() with an empty object instead of failing
+ const bool empty_on_enoent;
+ RGWObjVersionTracker *objv_tracker;
+
+ RGWAsyncGetSystemObj *req{nullptr};
+
+public:
+ RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const rgw_raw_obj& _obj,
+ T *_result, bool empty_on_enoent = true,
+ RGWObjVersionTracker *objv_tracker = nullptr)
+ : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
+ obj_ctx(store), obj(_obj), result(_result),
+ empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
+ ~RGWSimpleRadosReadCR() override {
+ request_cleanup();
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override;
+ int request_complete() override;
+
+ virtual int handle_data(T& data) {
+ return 0;
+ }
+};
+
+template <class T>
+int RGWSimpleRadosReadCR<T>::send_request()
+{
+ req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
+ store, &obj_ctx, objv_tracker,
+ obj,
+ &bl, 0, -1);
+ if (pattrs) {
+ req->set_read_attrs(pattrs);
+ }
+ async_rados->queue(req);
+ return 0;
+}
+
+template <class T>
+int RGWSimpleRadosReadCR<T>::request_complete()
+{
+ int ret = req->get_ret_status();
+ retcode = ret;
+ if (ret == -ENOENT && empty_on_enoent) {
+ *result = T();
+ } else {
+ if (ret < 0) {
+ return ret;
+ }
+ try {
+ bufferlist::iterator iter = bl.begin();
+ if (iter.end()) {
+ // allow successful reads with empty buffers. ReadSyncStatus coroutines
+ // depend on this to be able to read without locking, because the
+ // cls lock from InitSyncStatus will create an empty object if it didnt
+ // exist
+ *result = T();
+ } else {
+ ::decode(*result, iter);
+ }
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ }
+
+ return handle_data(*result);
+}
+
+class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ RGWObjectCtx obj_ctx;
+ bufferlist bl;
+
+ rgw_raw_obj obj;
+
+ map<string, bufferlist> *pattrs;
+
+ RGWAsyncGetSystemObj *req;
+
+public:
+ RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const rgw_raw_obj& _obj,
+ map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados), store(_store),
+ obj_ctx(store),
+ obj(_obj),
+ pattrs(_pattrs),
+ req(NULL) { }
+ ~RGWSimpleRadosReadAttrsCR() override {
+ request_cleanup();
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override;
+ int request_complete() override;
+};
+
+template <class T>
+class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ bufferlist bl;
+
+ rgw_raw_obj obj;
+ RGWObjVersionTracker *objv_tracker;
+
+ RGWAsyncPutSystemObj *req{nullptr};
+
+public:
+ RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const rgw_raw_obj& _obj,
+ const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
+ : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
+ store(_store), obj(_obj), objv_tracker(objv_tracker) {
+ ::encode(_data, bl);
+ }
+
+ ~RGWSimpleRadosWriteCR() override {
+ request_cleanup();
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override {
+ req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
+ store, objv_tracker, obj, false, bl);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() override {
+ return req->get_ret_status();
+ }
+};
+
+class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+
+ rgw_raw_obj obj;
+
+ map<string, bufferlist> attrs;
+
+ RGWAsyncPutSystemObjAttrs *req;
+
+public:
+ RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const rgw_raw_obj& _obj,
+ map<string, bufferlist>& _attrs) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados),
+ store(_store),
+ obj(_obj),
+ attrs(_attrs), req(NULL) {
+ }
+ ~RGWSimpleRadosWriteAttrsCR() override {
+ request_cleanup();
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override {
+ req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
+ store, NULL, obj, &attrs);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() override {
+ return req->get_ret_status();
+ }
+};
+
+class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
+ RGWRados *store;
+ map<string, bufferlist> entries;
+
+ rgw_rados_ref ref;
+
+ rgw_raw_obj obj;
+
+ boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
+public:
+ RGWRadosSetOmapKeysCR(RGWRados *_store,
+ const rgw_raw_obj& _obj,
+ map<string, bufferlist>& _entries);
+
+ int send_request() override;
+ int request_complete() override;
+};
+
+class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
+ RGWRados *store;
+
+ string marker;
+ map<string, bufferlist> *entries;
+ int max_entries;
+
+ int rval;
+ rgw_rados_ref ref;
+
+ rgw_raw_obj obj;
+
+ boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
+public:
+ RGWRadosGetOmapKeysCR(RGWRados *_store,
+ const rgw_raw_obj& _obj,
+ const string& _marker,
+ map<string, bufferlist> *_entries, int _max_entries);
+
+ int send_request() override;
+
+ int request_complete() override {
+ return rval;
+ }
+};
+
+class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
+ RGWRados *store;
+
+ rgw_rados_ref ref;
+
+ set<string> keys;
+
+ rgw_raw_obj obj;
+
+ boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
+public:
+ RGWRadosRemoveOmapKeysCR(RGWRados *_store,
+ const rgw_raw_obj& _obj,
+ const set<string>& _keys);
+
+ int send_request() override;
+
+ int request_complete() override;
+};
+
+class RGWRadosRemoveCR : public RGWSimpleCoroutine {
+ RGWRados *store;
+ librados::IoCtx ioctx;
+ const rgw_raw_obj obj;
+ boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
+public:
+ RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
+
+ int send_request();
+ int request_complete();
+};
+
+class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ string lock_name;
+ string cookie;
+ uint32_t duration;
+
+ rgw_raw_obj obj;
+
+ RGWAsyncLockSystemObj *req;
+
+public:
+ RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const rgw_raw_obj& _obj,
+ const string& _lock_name,
+ const string& _cookie,
+ uint32_t _duration);
+ ~RGWSimpleRadosLockCR() override {
+ request_cleanup();
+ }
+ void request_cleanup() override;
+
+ int send_request() override;
+ int request_complete() override;
+
+ static std::string gen_random_cookie(CephContext* cct) {
+#define COOKIE_LEN 16
+ char buf[COOKIE_LEN + 1];
+ gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
+ return buf;
+ }
+};
+
+class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ string lock_name;
+ string cookie;
+
+ rgw_raw_obj obj;
+
+ RGWAsyncUnlockSystemObj *req;
+
+public:
+ RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const rgw_raw_obj& _obj,
+ const string& _lock_name,
+ const string& _cookie);
+ ~RGWSimpleRadosUnlockCR() override {
+ request_cleanup();
+ }
+ void request_cleanup() override;
+
+ int send_request() override;
+ int request_complete() override;
+};
+
+#define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
+
+class RGWOmapAppend : public RGWConsumerCR<string> {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+
+ rgw_raw_obj obj;
+
+ bool going_down;
+
+ int num_pending_entries;
+ list<string> pending_entries;
+
+ map<string, bufferlist> entries;
+
+ uint64_t window_size;
+ uint64_t total_entries;
+public:
+ RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const rgw_raw_obj& _obj,
+ uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
+ int operate() override;
+ void flush_pending();
+ bool append(const string& s);
+ bool finish();
+
+ uint64_t get_total_entries() {
+ return total_entries;
+ }
+
+ const rgw_raw_obj& get_obj() {
+ return obj;
+ }
+};
+
+class RGWAsyncWait : public RGWAsyncRadosRequest {
+ CephContext *cct;
+ Mutex *lock;
+ Cond *cond;
+ utime_t interval;
+protected:
+ int _send_request() override {
+ Mutex::Locker l(*lock);
+ return cond->WaitInterval(*lock, interval);
+ }
+public:
+ RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
+ Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
+ cct(_cct),
+ lock(_lock), cond(_cond), interval(_secs, 0) {}
+
+ void wakeup() {
+ Mutex::Locker l(*lock);
+ cond->Signal();
+ }
+};
+
+class RGWWaitCR : public RGWSimpleCoroutine {
+ CephContext *cct;
+ RGWAsyncRadosProcessor *async_rados;
+ Mutex *lock;
+ Cond *cond;
+ int secs;
+
+ RGWAsyncWait *req;
+
+public:
+ RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
+ Mutex *_lock, Cond *_cond,
+ int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
+ async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
+ }
+ ~RGWWaitCR() override {
+ request_cleanup();
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ wakeup();
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override {
+ req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() override {
+ return req->get_ret_status();
+ }
+
+ void wakeup() {
+ req->wakeup();
+ }
+};
+
+class RGWShardedOmapCRManager {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ RGWCoroutine *op;
+
+ int num_shards;
+
+ vector<RGWOmapAppend *> shards;
+public:
+ RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
+ : async_rados(_async_rados),
+ store(_store), op(_op), num_shards(_num_shards) {
+ shards.reserve(num_shards);
+ for (int i = 0; i < num_shards; ++i) {
+ char buf[oid_prefix.size() + 16];
+ snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
+ RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
+ shard->get();
+ shards.push_back(shard);
+ op->spawn(shard, false);
+ }
+ }
+
+ ~RGWShardedOmapCRManager() {
+ for (auto shard : shards) {
+ shard->put();
+ }
+ }
+
+ bool append(const string& entry, int shard_id) {
+ return shards[shard_id]->append(entry);
+ }
+ bool finish() {
+ bool success = true;
+ for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
+ success &= ((*iter)->finish() && (!(*iter)->is_error()));
+ }
+ return success;
+ }
+
+ uint64_t get_total_entries(int shard_id) {
+ return shards[shard_id]->get_total_entries();
+ }
+};
+
+class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ rgw_bucket bucket;
+ RGWBucketInfo *bucket_info;
+
+protected:
+ int _send_request() override;
+public:
+ RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
+ RGWRados *_store, const rgw_bucket& bucket,
+ RGWBucketInfo *_bucket_info)
+ : RGWAsyncRadosRequest(caller, cn), store(_store),
+ bucket(bucket), bucket_info(_bucket_info) {}
+};
+
+class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ rgw_bucket bucket;
+ RGWBucketInfo *bucket_info;
+
+ RGWAsyncGetBucketInstanceInfo *req;
+
+public:
+ RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
+ : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
+ bucket(bucket), bucket_info(_bucket_info), req(NULL) {}
+ ~RGWGetBucketInstanceInfoCR() override {
+ request_cleanup();
+ }
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override {
+ req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket, bucket_info);
+ async_rados->queue(req);
+ return 0;
+ }
+ int request_complete() override {
+ return req->get_ret_status();
+ }
+};
+
+class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ string source_zone;
+
+ RGWBucketInfo bucket_info;
+
+ rgw_obj_key key;
+ uint64_t versioned_epoch;
+
+ real_time src_mtime;
+
+ bool copy_if_newer;
+ rgw_zone_set *zones_trace;
+
+protected:
+ int _send_request() override;
+public:
+ RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+ const string& _source_zone,
+ RGWBucketInfo& _bucket_info,
+ const rgw_obj_key& _key,
+ uint64_t _versioned_epoch,
+ bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ key(_key),
+ versioned_epoch(_versioned_epoch),
+ copy_if_newer(_if_newer), zones_trace(_zones_trace) {}
+};
+
+class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
+ CephContext *cct;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ string source_zone;
+
+ RGWBucketInfo bucket_info;
+
+ rgw_obj_key key;
+ uint64_t versioned_epoch;
+
+ real_time src_mtime;
+
+ bool copy_if_newer;
+
+ RGWAsyncFetchRemoteObj *req;
+ rgw_zone_set *zones_trace;
+
+public:
+ RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const string& _source_zone,
+ RGWBucketInfo& _bucket_info,
+ const rgw_obj_key& _key,
+ uint64_t _versioned_epoch,
+ bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+ async_rados(_async_rados), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ key(_key),
+ versioned_epoch(_versioned_epoch),
+ copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
+
+
+ ~RGWFetchRemoteObjCR() override {
+ request_cleanup();
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override {
+ req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
+ key, versioned_epoch, copy_if_newer, zones_trace);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() override {
+ return req->get_ret_status();
+ }
+};
+
+class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ string source_zone;
+
+ RGWBucketInfo bucket_info;
+
+ rgw_obj_key key;
+
+ ceph::real_time *pmtime;
+ uint64_t *psize;
+ map<string, bufferlist> *pattrs;
+
+protected:
+ int _send_request() override;
+public:
+ RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+ const string& _source_zone,
+ RGWBucketInfo& _bucket_info,
+ const rgw_obj_key& _key,
+ ceph::real_time *_pmtime,
+ uint64_t *_psize,
+ map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ key(_key),
+ pmtime(_pmtime),
+ psize(_psize),
+ pattrs(_pattrs) {}
+};
+
+class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
+ CephContext *cct;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ string source_zone;
+
+ RGWBucketInfo bucket_info;
+
+ rgw_obj_key key;
+
+ ceph::real_time *pmtime;
+ uint64_t *psize;
+ map<string, bufferlist> *pattrs;
+
+ RGWAsyncStatRemoteObj *req;
+
+public:
+ RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const string& _source_zone,
+ RGWBucketInfo& _bucket_info,
+ const rgw_obj_key& _key,
+ ceph::real_time *_pmtime,
+ uint64_t *_psize,
+ map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+ async_rados(_async_rados), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ key(_key),
+ pmtime(_pmtime),
+ psize(_psize),
+ pattrs(_pattrs),
+ req(NULL) {}
+
+
+ ~RGWStatRemoteObjCR() override {
+ request_cleanup();
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override {
+ req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
+ bucket_info, key, pmtime, psize, pattrs);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() override {
+ return req->get_ret_status();
+ }
+};
+
+class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ string source_zone;
+
+ RGWBucketInfo bucket_info;
+
+ rgw_obj_key key;
+ string owner;
+ string owner_display_name;
+ bool versioned;
+ uint64_t versioned_epoch;
+ string marker_version_id;
+
+ bool del_if_older;
+ ceph::real_time timestamp;
+ rgw_zone_set *zones_trace;
+
+protected:
+ int _send_request() override;
+public:
+ RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+ const string& _source_zone,
+ RGWBucketInfo& _bucket_info,
+ const rgw_obj_key& _key,
+ const string& _owner,
+ const string& _owner_display_name,
+ bool _versioned,
+ uint64_t _versioned_epoch,
+ bool _delete_marker,
+ bool _if_older,
+ real_time& _timestamp,
+ rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ key(_key),
+ owner(_owner),
+ owner_display_name(_owner_display_name),
+ versioned(_versioned),
+ versioned_epoch(_versioned_epoch),
+ del_if_older(_if_older),
+ timestamp(_timestamp), zones_trace(_zones_trace) {
+ if (_delete_marker) {
+ marker_version_id = key.instance;
+ }
+ }
+};
+
+class RGWRemoveObjCR : public RGWSimpleCoroutine {
+ CephContext *cct;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ string source_zone;
+
+ RGWBucketInfo bucket_info;
+
+ rgw_obj_key key;
+ bool versioned;
+ uint64_t versioned_epoch;
+ bool delete_marker;
+ string owner;
+ string owner_display_name;
+
+ bool del_if_older;
+ real_time timestamp;
+
+ RGWAsyncRemoveObj *req;
+
+ rgw_zone_set *zones_trace;
+
+public:
+ RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const string& _source_zone,
+ RGWBucketInfo& _bucket_info,
+ const rgw_obj_key& _key,
+ bool _versioned,
+ uint64_t _versioned_epoch,
+ string *_owner,
+ string *_owner_display_name,
+ bool _delete_marker,
+ real_time *_timestamp,
+ rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+ async_rados(_async_rados), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ key(_key),
+ versioned(_versioned),
+ versioned_epoch(_versioned_epoch),
+ delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
+ del_if_older = (_timestamp != NULL);
+ if (_timestamp) {
+ timestamp = *_timestamp;
+ }
+
+ if (_owner) {
+ owner = *_owner;
+ }
+
+ if (_owner_display_name) {
+ owner_display_name = *_owner_display_name;
+ }
+ }
+ ~RGWRemoveObjCR() override {
+ request_cleanup();
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override {
+ req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
+ key, owner, owner_display_name, versioned, versioned_epoch,
+ delete_marker, del_if_older, timestamp, zones_trace);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() override {
+ return req->get_ret_status();
+ }
+};
+
+class RGWContinuousLeaseCR : public RGWCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+
+ const rgw_raw_obj obj;
+
+ const string lock_name;
+ const string cookie;
+
+ int interval;
+
+ Mutex lock;
+ std::atomic<bool> going_down = { false };
+ bool locked{false};
+
+ RGWCoroutine *caller;
+
+ bool aborted{false};
+
+public:
+ RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const rgw_raw_obj& _obj,
+ const string& _lock_name, int _interval, RGWCoroutine *_caller)
+ : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
+ obj(_obj), lock_name(_lock_name),
+ cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
+ interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
+ {}
+
+ int operate() override;
+
+ bool is_locked() {
+ Mutex::Locker l(lock);
+ return locked;
+ }
+
+ void set_locked(bool status) {
+ Mutex::Locker l(lock);
+ locked = status;
+ }
+
+ void go_down() {
+ going_down = true;
+ wakeup();
+ }
+
+ void abort() {
+ aborted = true;
+ }
+};
+
+class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
+ RGWRados *store;
+ list<cls_log_entry> entries;
+
+ string oid;
+
+ boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
+public:
+ RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
+ const cls_log_entry& entry);
+
+ int send_request() override;
+ int request_complete() override;
+};
+
+class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
+ RGWRados *store;
+ boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+ protected:
+ std::string oid;
+ real_time start_time;
+ real_time end_time;
+ std::string from_marker;
+ std::string to_marker;
+
+ public:
+ RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
+ const real_time& start_time, const real_time& end_time,
+ const std::string& from_marker,
+ const std::string& to_marker);
+
+ int send_request() override;
+ int request_complete() override;
+};
+
+// wrapper to update last_trim_marker on success
+class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
+ CephContext *cct;
+ std::string *last_trim_marker;
+ public:
+ RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
+ const std::string& to_marker, std::string *last_trim_marker);
+ int request_complete() override;
+};
+
+class RGWAsyncStatObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ RGWBucketInfo bucket_info;
+ rgw_obj obj;
+ uint64_t *psize;
+ real_time *pmtime;
+ uint64_t *pepoch;
+ RGWObjVersionTracker *objv_tracker;
+protected:
+ int _send_request() override;
+public:
+ RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
+ const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
+ real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
+ RGWObjVersionTracker *objv_tracker = nullptr)
+ : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
+ pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
+};
+
+class RGWStatObjCR : public RGWSimpleCoroutine {
+ RGWRados *store;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWBucketInfo bucket_info;
+ rgw_obj obj;
+ uint64_t *psize;
+ real_time *pmtime;
+ uint64_t *pepoch;
+ RGWObjVersionTracker *objv_tracker;
+ RGWAsyncStatObj *req = nullptr;
+ public:
+ RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
+ const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
+ real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
+ RGWObjVersionTracker *objv_tracker = nullptr);
+ ~RGWStatObjCR() override {
+ request_cleanup();
+ }
+ void request_cleanup() override;
+
+ int send_request() override;
+ int request_complete() override;
+};
+
+#endif