X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_cr_rados.h;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_cr_rados.h;h=a82bb35b5fce8fc5e7b7a08e866ca6bed74364db;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_cr_rados.h b/src/ceph/src/rgw/rgw_cr_rados.h new file mode 100644 index 0000000..a82bb35 --- /dev/null +++ b/src/ceph/src/rgw/rgw_cr_rados.h @@ -0,0 +1,1157 @@ +#ifndef CEPH_RGW_CR_RADOS_H +#define CEPH_RGW_CR_RADOS_H + +#include +#include "include/assert.h" +#include "rgw_coroutine.h" +#include "rgw_rados.h" +#include "common/WorkQueue.h" +#include "common/Throttle.h" + +#include + +class RGWAsyncRadosRequest : public RefCountedObject { + RGWCoroutine *caller; + RGWAioCompletionNotifier *notifier; + + int retcode; + + Mutex lock; + +protected: + virtual int _send_request() = 0; +public: + RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0), + lock("RGWAsyncRadosRequest::lock") { + } + ~RGWAsyncRadosRequest() override { + if (notifier) { + notifier->put(); + } + } + + void send_request() { + get(); + retcode = _send_request(); + { + Mutex::Locker l(lock); + if (notifier) { + notifier->cb(); // drops its own ref + notifier = nullptr; + } + } + put(); + } + + int get_ret_status() { return retcode; } + + void finish() { + { + Mutex::Locker l(lock); + if (notifier) { + // we won't call notifier->cb() to drop its ref, so drop it here + notifier->put(); + notifier = nullptr; + } + } + put(); + } +}; + + +class RGWAsyncRadosProcessor { + deque m_req_queue; + std::atomic going_down = { false }; +protected: + RGWRados *store; + ThreadPool m_tp; + Throttle req_throttle; + + struct RGWWQ : public ThreadPool::WorkQueue { + RGWAsyncRadosProcessor *processor; + RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) + : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), processor(p) {} + + bool _enqueue(RGWAsyncRadosRequest *req) override; + void _dequeue(RGWAsyncRadosRequest *req) override { + ceph_abort(); + } + bool _empty() override; + RGWAsyncRadosRequest *_dequeue() override; + using ThreadPool::WorkQueue::_process; + void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override; + void _dump_queue(); + void _clear() override { + assert(processor->m_req_queue.empty()); + } + } req_wq; + +public: + RGWAsyncRadosProcessor(RGWRados *_store, int num_threads); + ~RGWAsyncRadosProcessor() {} + void start(); + void stop(); + void handle_request(RGWAsyncRadosRequest *req); + void queue(RGWAsyncRadosRequest *req); + + bool is_going_down() { + return going_down; + } +}; + + +class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest { + RGWRados *store; + RGWObjectCtx *obj_ctx; + RGWRados::SystemObject::Read::GetObjState read_state; + RGWObjVersionTracker *objv_tracker; + rgw_raw_obj obj; + bufferlist *pbl; + map *pattrs; + off_t ofs; + off_t end; +protected: + int _send_request() override; +public: + RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx, + RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, + bufferlist *_pbl, off_t _ofs, off_t _end); + void set_read_attrs(map *_pattrs) { pattrs = _pattrs; } +}; + +class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest { + RGWRados *store; + RGWObjVersionTracker *objv_tracker; + rgw_raw_obj obj; + bool exclusive; + bufferlist bl; + +protected: + int _send_request() override; +public: + RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, + RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj, + bool _exclusive, bufferlist& _bl); +}; + +class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest { + RGWRados *store; + RGWObjVersionTracker *objv_tracker; + rgw_raw_obj obj; + map *attrs; + +protected: + int _send_request() override; +public: + RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, + RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, + map *_attrs); +}; + +class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest { + RGWRados *store; + rgw_raw_obj obj; + string lock_name; + string cookie; + uint32_t duration_secs; + +protected: + int _send_request() override; +public: + RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, + RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, + const string& _name, const string& _cookie, uint32_t _duration_secs); +}; + +class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest { + RGWRados *store; + rgw_raw_obj obj; + string lock_name; + string cookie; + +protected: + int _send_request() override; +public: + RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, + RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, + const string& _name, const string& _cookie); +}; + +template +class RGWSimpleRadosReadCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + RGWObjectCtx obj_ctx; + bufferlist bl; + + rgw_raw_obj obj; + + map *pattrs{nullptr}; + + T *result; + /// on ENOENT, call handle_data() with an empty object instead of failing + const bool empty_on_enoent; + RGWObjVersionTracker *objv_tracker; + + RGWAsyncGetSystemObj *req{nullptr}; + +public: + RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const rgw_raw_obj& _obj, + T *_result, bool empty_on_enoent = true, + RGWObjVersionTracker *objv_tracker = nullptr) + : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), + obj_ctx(store), obj(_obj), result(_result), + empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {} + ~RGWSimpleRadosReadCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request() override; + int request_complete() override; + + virtual int handle_data(T& data) { + return 0; + } +}; + +template +int RGWSimpleRadosReadCR::send_request() +{ + req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(), + store, &obj_ctx, objv_tracker, + obj, + &bl, 0, -1); + if (pattrs) { + req->set_read_attrs(pattrs); + } + async_rados->queue(req); + return 0; +} + +template +int RGWSimpleRadosReadCR::request_complete() +{ + int ret = req->get_ret_status(); + retcode = ret; + if (ret == -ENOENT && empty_on_enoent) { + *result = T(); + } else { + if (ret < 0) { + return ret; + } + try { + bufferlist::iterator iter = bl.begin(); + if (iter.end()) { + // allow successful reads with empty buffers. ReadSyncStatus coroutines + // depend on this to be able to read without locking, because the + // cls lock from InitSyncStatus will create an empty object if it didnt + // exist + *result = T(); + } else { + ::decode(*result, iter); + } + } catch (buffer::error& err) { + return -EIO; + } + } + + return handle_data(*result); +} + +class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + RGWObjectCtx obj_ctx; + bufferlist bl; + + rgw_raw_obj obj; + + map *pattrs; + + RGWAsyncGetSystemObj *req; + +public: + RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const rgw_raw_obj& _obj, + map *_pattrs) : RGWSimpleCoroutine(_store->ctx()), + async_rados(_async_rados), store(_store), + obj_ctx(store), + obj(_obj), + pattrs(_pattrs), + req(NULL) { } + ~RGWSimpleRadosReadAttrsCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request() override; + int request_complete() override; +}; + +template +class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + bufferlist bl; + + rgw_raw_obj obj; + RGWObjVersionTracker *objv_tracker; + + RGWAsyncPutSystemObj *req{nullptr}; + +public: + RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const rgw_raw_obj& _obj, + const T& _data, RGWObjVersionTracker *objv_tracker = nullptr) + : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), + store(_store), obj(_obj), objv_tracker(objv_tracker) { + ::encode(_data, bl); + } + + ~RGWSimpleRadosWriteCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request() override { + req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(), + store, objv_tracker, obj, false, bl); + async_rados->queue(req); + return 0; + } + + int request_complete() override { + return req->get_ret_status(); + } +}; + +class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + + rgw_raw_obj obj; + + map attrs; + + RGWAsyncPutSystemObjAttrs *req; + +public: + RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const rgw_raw_obj& _obj, + map& _attrs) : RGWSimpleCoroutine(_store->ctx()), + async_rados(_async_rados), + store(_store), + obj(_obj), + attrs(_attrs), req(NULL) { + } + ~RGWSimpleRadosWriteAttrsCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request() override { + req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(), + store, NULL, obj, &attrs); + async_rados->queue(req); + return 0; + } + + int request_complete() override { + return req->get_ret_status(); + } +}; + +class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine { + RGWRados *store; + map entries; + + rgw_rados_ref ref; + + rgw_raw_obj obj; + + boost::intrusive_ptr cn; + +public: + RGWRadosSetOmapKeysCR(RGWRados *_store, + const rgw_raw_obj& _obj, + map& _entries); + + int send_request() override; + int request_complete() override; +}; + +class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine { + RGWRados *store; + + string marker; + map *entries; + int max_entries; + + int rval; + rgw_rados_ref ref; + + rgw_raw_obj obj; + + boost::intrusive_ptr cn; + +public: + RGWRadosGetOmapKeysCR(RGWRados *_store, + const rgw_raw_obj& _obj, + const string& _marker, + map *_entries, int _max_entries); + + int send_request() override; + + int request_complete() override { + return rval; + } +}; + +class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine { + RGWRados *store; + + rgw_rados_ref ref; + + set keys; + + rgw_raw_obj obj; + + boost::intrusive_ptr cn; + +public: + RGWRadosRemoveOmapKeysCR(RGWRados *_store, + const rgw_raw_obj& _obj, + const set& _keys); + + int send_request() override; + + int request_complete() override; +}; + +class RGWRadosRemoveCR : public RGWSimpleCoroutine { + RGWRados *store; + librados::IoCtx ioctx; + const rgw_raw_obj obj; + boost::intrusive_ptr cn; + +public: + RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj); + + int send_request(); + int request_complete(); +}; + +class RGWSimpleRadosLockCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + string lock_name; + string cookie; + uint32_t duration; + + rgw_raw_obj obj; + + RGWAsyncLockSystemObj *req; + +public: + RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const rgw_raw_obj& _obj, + const string& _lock_name, + const string& _cookie, + uint32_t _duration); + ~RGWSimpleRadosLockCR() override { + request_cleanup(); + } + void request_cleanup() override; + + int send_request() override; + int request_complete() override; + + static std::string gen_random_cookie(CephContext* cct) { +#define COOKIE_LEN 16 + char buf[COOKIE_LEN + 1]; + gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); + return buf; + } +}; + +class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + string lock_name; + string cookie; + + rgw_raw_obj obj; + + RGWAsyncUnlockSystemObj *req; + +public: + RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const rgw_raw_obj& _obj, + const string& _lock_name, + const string& _cookie); + ~RGWSimpleRadosUnlockCR() override { + request_cleanup(); + } + void request_cleanup() override; + + int send_request() override; + int request_complete() override; +}; + +#define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100 + +class RGWOmapAppend : public RGWConsumerCR { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + + rgw_raw_obj obj; + + bool going_down; + + int num_pending_entries; + list pending_entries; + + map entries; + + uint64_t window_size; + uint64_t total_entries; +public: + RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const rgw_raw_obj& _obj, + uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT); + int operate() override; + void flush_pending(); + bool append(const string& s); + bool finish(); + + uint64_t get_total_entries() { + return total_entries; + } + + const rgw_raw_obj& get_obj() { + return obj; + } +}; + +class RGWAsyncWait : public RGWAsyncRadosRequest { + CephContext *cct; + Mutex *lock; + Cond *cond; + utime_t interval; +protected: + int _send_request() override { + Mutex::Locker l(*lock); + return cond->WaitInterval(*lock, interval); + } +public: + RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct, + Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn), + cct(_cct), + lock(_lock), cond(_cond), interval(_secs, 0) {} + + void wakeup() { + Mutex::Locker l(*lock); + cond->Signal(); + } +}; + +class RGWWaitCR : public RGWSimpleCoroutine { + CephContext *cct; + RGWAsyncRadosProcessor *async_rados; + Mutex *lock; + Cond *cond; + int secs; + + RGWAsyncWait *req; + +public: + RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct, + Mutex *_lock, Cond *_cond, + int _secs) : RGWSimpleCoroutine(_cct), cct(_cct), + async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) { + } + ~RGWWaitCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + wakeup(); + req->finish(); + req = NULL; + } + } + + int send_request() override { + req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs); + async_rados->queue(req); + return 0; + } + + int request_complete() override { + return req->get_ret_status(); + } + + void wakeup() { + req->wakeup(); + } +}; + +class RGWShardedOmapCRManager { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + RGWCoroutine *op; + + int num_shards; + + vector shards; +public: + RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix) + : async_rados(_async_rados), + store(_store), op(_op), num_shards(_num_shards) { + shards.reserve(num_shards); + for (int i = 0; i < num_shards; ++i) { + char buf[oid_prefix.size() + 16]; + snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i); + RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf)); + shard->get(); + shards.push_back(shard); + op->spawn(shard, false); + } + } + + ~RGWShardedOmapCRManager() { + for (auto shard : shards) { + shard->put(); + } + } + + bool append(const string& entry, int shard_id) { + return shards[shard_id]->append(entry); + } + bool finish() { + bool success = true; + for (vector::iterator iter = shards.begin(); iter != shards.end(); ++iter) { + success &= ((*iter)->finish() && (!(*iter)->is_error())); + } + return success; + } + + uint64_t get_total_entries(int shard_id) { + return shards[shard_id]->get_total_entries(); + } +}; + +class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest { + RGWRados *store; + rgw_bucket bucket; + RGWBucketInfo *bucket_info; + +protected: + int _send_request() override; +public: + RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, + RGWRados *_store, const rgw_bucket& bucket, + RGWBucketInfo *_bucket_info) + : RGWAsyncRadosRequest(caller, cn), store(_store), + bucket(bucket), bucket_info(_bucket_info) {} +}; + +class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + rgw_bucket bucket; + RGWBucketInfo *bucket_info; + + RGWAsyncGetBucketInstanceInfo *req; + +public: + RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const rgw_bucket& bucket, RGWBucketInfo *_bucket_info) + : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), + bucket(bucket), bucket_info(_bucket_info), req(NULL) {} + ~RGWGetBucketInstanceInfoCR() override { + request_cleanup(); + } + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request() override { + req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket, bucket_info); + async_rados->queue(req); + return 0; + } + int request_complete() override { + return req->get_ret_status(); + } +}; + +class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest { + RGWRados *store; + string source_zone; + + RGWBucketInfo bucket_info; + + rgw_obj_key key; + uint64_t versioned_epoch; + + real_time src_mtime; + + bool copy_if_newer; + rgw_zone_set *zones_trace; + +protected: + int _send_request() override; +public: + RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, + const string& _source_zone, + RGWBucketInfo& _bucket_info, + const rgw_obj_key& _key, + uint64_t _versioned_epoch, + bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store), + source_zone(_source_zone), + bucket_info(_bucket_info), + key(_key), + versioned_epoch(_versioned_epoch), + copy_if_newer(_if_newer), zones_trace(_zones_trace) {} +}; + +class RGWFetchRemoteObjCR : public RGWSimpleCoroutine { + CephContext *cct; + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + string source_zone; + + RGWBucketInfo bucket_info; + + rgw_obj_key key; + uint64_t versioned_epoch; + + real_time src_mtime; + + bool copy_if_newer; + + RGWAsyncFetchRemoteObj *req; + rgw_zone_set *zones_trace; + +public: + RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const string& _source_zone, + RGWBucketInfo& _bucket_info, + const rgw_obj_key& _key, + uint64_t _versioned_epoch, + bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), + async_rados(_async_rados), store(_store), + source_zone(_source_zone), + bucket_info(_bucket_info), + key(_key), + versioned_epoch(_versioned_epoch), + copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {} + + + ~RGWFetchRemoteObjCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request() override { + req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info, + key, versioned_epoch, copy_if_newer, zones_trace); + async_rados->queue(req); + return 0; + } + + int request_complete() override { + return req->get_ret_status(); + } +}; + +class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest { + RGWRados *store; + string source_zone; + + RGWBucketInfo bucket_info; + + rgw_obj_key key; + + ceph::real_time *pmtime; + uint64_t *psize; + map *pattrs; + +protected: + int _send_request() override; +public: + RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, + const string& _source_zone, + RGWBucketInfo& _bucket_info, + const rgw_obj_key& _key, + ceph::real_time *_pmtime, + uint64_t *_psize, + map *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store), + source_zone(_source_zone), + bucket_info(_bucket_info), + key(_key), + pmtime(_pmtime), + psize(_psize), + pattrs(_pattrs) {} +}; + +class RGWStatRemoteObjCR : public RGWSimpleCoroutine { + CephContext *cct; + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + string source_zone; + + RGWBucketInfo bucket_info; + + rgw_obj_key key; + + ceph::real_time *pmtime; + uint64_t *psize; + map *pattrs; + + RGWAsyncStatRemoteObj *req; + +public: + RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const string& _source_zone, + RGWBucketInfo& _bucket_info, + const rgw_obj_key& _key, + ceph::real_time *_pmtime, + uint64_t *_psize, + map *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), + async_rados(_async_rados), store(_store), + source_zone(_source_zone), + bucket_info(_bucket_info), + key(_key), + pmtime(_pmtime), + psize(_psize), + pattrs(_pattrs), + req(NULL) {} + + + ~RGWStatRemoteObjCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request() override { + req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone, + bucket_info, key, pmtime, psize, pattrs); + async_rados->queue(req); + return 0; + } + + int request_complete() override { + return req->get_ret_status(); + } +}; + +class RGWAsyncRemoveObj : public RGWAsyncRadosRequest { + RGWRados *store; + string source_zone; + + RGWBucketInfo bucket_info; + + rgw_obj_key key; + string owner; + string owner_display_name; + bool versioned; + uint64_t versioned_epoch; + string marker_version_id; + + bool del_if_older; + ceph::real_time timestamp; + rgw_zone_set *zones_trace; + +protected: + int _send_request() override; +public: + RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, + const string& _source_zone, + RGWBucketInfo& _bucket_info, + const rgw_obj_key& _key, + const string& _owner, + const string& _owner_display_name, + bool _versioned, + uint64_t _versioned_epoch, + bool _delete_marker, + bool _if_older, + real_time& _timestamp, + rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store), + source_zone(_source_zone), + bucket_info(_bucket_info), + key(_key), + owner(_owner), + owner_display_name(_owner_display_name), + versioned(_versioned), + versioned_epoch(_versioned_epoch), + del_if_older(_if_older), + timestamp(_timestamp), zones_trace(_zones_trace) { + if (_delete_marker) { + marker_version_id = key.instance; + } + } +}; + +class RGWRemoveObjCR : public RGWSimpleCoroutine { + CephContext *cct; + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + string source_zone; + + RGWBucketInfo bucket_info; + + rgw_obj_key key; + bool versioned; + uint64_t versioned_epoch; + bool delete_marker; + string owner; + string owner_display_name; + + bool del_if_older; + real_time timestamp; + + RGWAsyncRemoveObj *req; + + rgw_zone_set *zones_trace; + +public: + RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const string& _source_zone, + RGWBucketInfo& _bucket_info, + const rgw_obj_key& _key, + bool _versioned, + uint64_t _versioned_epoch, + string *_owner, + string *_owner_display_name, + bool _delete_marker, + real_time *_timestamp, + rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), + async_rados(_async_rados), store(_store), + source_zone(_source_zone), + bucket_info(_bucket_info), + key(_key), + versioned(_versioned), + versioned_epoch(_versioned_epoch), + delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) { + del_if_older = (_timestamp != NULL); + if (_timestamp) { + timestamp = *_timestamp; + } + + if (_owner) { + owner = *_owner; + } + + if (_owner_display_name) { + owner_display_name = *_owner_display_name; + } + } + ~RGWRemoveObjCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request() override { + req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info, + key, owner, owner_display_name, versioned, versioned_epoch, + delete_marker, del_if_older, timestamp, zones_trace); + async_rados->queue(req); + return 0; + } + + int request_complete() override { + return req->get_ret_status(); + } +}; + +class RGWContinuousLeaseCR : public RGWCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + + const rgw_raw_obj obj; + + const string lock_name; + const string cookie; + + int interval; + + Mutex lock; + std::atomic going_down = { false }; + bool locked{false}; + + RGWCoroutine *caller; + + bool aborted{false}; + +public: + RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const rgw_raw_obj& _obj, + const string& _lock_name, int _interval, RGWCoroutine *_caller) + : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), + obj(_obj), lock_name(_lock_name), + cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)), + interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller) + {} + + int operate() override; + + bool is_locked() { + Mutex::Locker l(lock); + return locked; + } + + void set_locked(bool status) { + Mutex::Locker l(lock); + locked = status; + } + + void go_down() { + going_down = true; + wakeup(); + } + + void abort() { + aborted = true; + } +}; + +class RGWRadosTimelogAddCR : public RGWSimpleCoroutine { + RGWRados *store; + list entries; + + string oid; + + boost::intrusive_ptr cn; + +public: + RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid, + const cls_log_entry& entry); + + int send_request() override; + int request_complete() override; +}; + +class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine { + RGWRados *store; + boost::intrusive_ptr cn; + protected: + std::string oid; + real_time start_time; + real_time end_time; + std::string from_marker; + std::string to_marker; + + public: + RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid, + const real_time& start_time, const real_time& end_time, + const std::string& from_marker, + const std::string& to_marker); + + int send_request() override; + int request_complete() override; +}; + +// wrapper to update last_trim_marker on success +class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR { + CephContext *cct; + std::string *last_trim_marker; + public: + RGWSyncLogTrimCR(RGWRados *store, const std::string& oid, + const std::string& to_marker, std::string *last_trim_marker); + int request_complete() override; +}; + +class RGWAsyncStatObj : public RGWAsyncRadosRequest { + RGWRados *store; + RGWBucketInfo bucket_info; + rgw_obj obj; + uint64_t *psize; + real_time *pmtime; + uint64_t *pepoch; + RGWObjVersionTracker *objv_tracker; +protected: + int _send_request() override; +public: + RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store, + const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr, + real_time *pmtime = nullptr, uint64_t *pepoch = nullptr, + RGWObjVersionTracker *objv_tracker = nullptr) + : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize), + pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {} +}; + +class RGWStatObjCR : public RGWSimpleCoroutine { + RGWRados *store; + RGWAsyncRadosProcessor *async_rados; + RGWBucketInfo bucket_info; + rgw_obj obj; + uint64_t *psize; + real_time *pmtime; + uint64_t *pepoch; + RGWObjVersionTracker *objv_tracker; + RGWAsyncStatObj *req = nullptr; + public: + RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store, + const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr, + real_time* pmtime = nullptr, uint64_t *pepoch = nullptr, + RGWObjVersionTracker *objv_tracker = nullptr); + ~RGWStatObjCR() override { + request_cleanup(); + } + void request_cleanup() override; + + int send_request() override; + int request_complete() override; +}; + +#endif