X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_coroutine.h;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_coroutine.h;h=f84232875616236f1daa611a964b6dfa16379776;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_coroutine.h b/src/ceph/src/rgw/rgw_coroutine.h new file mode 100644 index 0000000..f842328 --- /dev/null +++ b/src/ceph/src/rgw/rgw_coroutine.h @@ -0,0 +1,588 @@ +#ifndef CEPH_RGW_COROUTINE_H +#define CEPH_RGW_COROUTINE_H + +#ifdef _ASSERT_H +#define NEED_ASSERT_H +#pragma push_macro("_ASSERT_H") +#endif + +#include +#include + +#ifdef NEED_ASSERT_H +#pragma pop_macro("_ASSERT_H") +#endif + +#include "include/utime.h" +#include "common/RefCountedObj.h" +#include "common/debug.h" +#include "common/Timer.h" +#include "common/admin_socket.h" + +#include "rgw_common.h" +#include + +#include + +#define RGW_ASYNC_OPS_MGR_WINDOW 100 + +class RGWCoroutinesStack; +class RGWCoroutinesManager; +class RGWAioCompletionNotifier; + +class RGWCompletionManager : public RefCountedObject { + CephContext *cct; + list complete_reqs; + using NotifierRef = boost::intrusive_ptr; + set cns; + + Mutex lock; + Cond cond; + + SafeTimer timer; + + std::atomic going_down = { false }; + + map waiters; + + class WaitContext; + +protected: + void _wakeup(void *opaque); + void _complete(RGWAioCompletionNotifier *cn, void *user_info); +public: + RGWCompletionManager(CephContext *_cct); + ~RGWCompletionManager() override; + + void complete(RGWAioCompletionNotifier *cn, void *user_info); + int get_next(void **user_info); + bool try_get_next(void **user_info); + + void go_down(); + + /* + * wait for interval length to complete user_info + */ + void wait_interval(void *opaque, const utime_t& interval, void *user_info); + void wakeup(void *opaque); + + void register_completion_notifier(RGWAioCompletionNotifier *cn); + void unregister_completion_notifier(RGWAioCompletionNotifier *cn); +}; + +/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */ +class RGWAioCompletionNotifier : public RefCountedObject { + librados::AioCompletion *c; + RGWCompletionManager *completion_mgr; + void *user_data; + Mutex lock; + bool registered; + +public: + RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data); + ~RGWAioCompletionNotifier() override { + c->release(); + lock.Lock(); + bool need_unregister = registered; + if (registered) { + completion_mgr->get(); + } + registered = false; + lock.Unlock(); + if (need_unregister) { + completion_mgr->unregister_completion_notifier(this); + completion_mgr->put(); + } + } + + librados::AioCompletion *completion() { + return c; + } + + void unregister() { + Mutex::Locker l(lock); + if (!registered) { + return; + } + registered = false; + } + + void cb() { + lock.Lock(); + if (!registered) { + lock.Unlock(); + put(); + return; + } + completion_mgr->get(); + registered = false; + lock.Unlock(); + completion_mgr->complete(this, user_data); + completion_mgr->put(); + put(); + } +}; + +struct RGWCoroutinesEnv { + uint64_t run_context; + RGWCoroutinesManager *manager; + list *scheduled_stacks; + RGWCoroutinesStack *stack; + + RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {} +}; + +enum RGWCoroutineState { + RGWCoroutine_Error = -2, + RGWCoroutine_Done = -1, + RGWCoroutine_Run = 0, +}; + +struct rgw_spawned_stacks { + vector entries; + + rgw_spawned_stacks() {} + + void add_pending(RGWCoroutinesStack *s) { + entries.push_back(s); + } + + void inherit(rgw_spawned_stacks *source) { + for (vector::iterator iter = source->entries.begin(); + iter != source->entries.end(); ++iter) { + add_pending(*iter); + } + source->entries.clear(); + } +}; + + + +class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine { + friend class RGWCoroutinesStack; + + struct StatusItem { + utime_t timestamp; + string status; + + StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {} + + void dump(Formatter *f) const; + }; + +#define MAX_COROUTINE_HISTORY 10 + + struct Status { + CephContext *cct; + RWLock lock; + int max_history; + + utime_t timestamp; + stringstream status; + + Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {} + + deque history; + + stringstream& set_status(); + } status; + + stringstream description; + +protected: + bool _yield_ret; + boost::asio::coroutine drain_cr; + + CephContext *cct; + + RGWCoroutinesStack *stack; + int retcode; + int state; + + rgw_spawned_stacks spawned; + + stringstream error_stream; + + int set_state(int s, int ret = 0) { + state = s; + return ret; + } + int set_cr_error(int ret) { + state = RGWCoroutine_Error; + return ret; + } + int set_cr_done() { + state = RGWCoroutine_Done; + return 0; + } + void set_io_blocked(bool flag); + int io_block(int ret = 0); + + void reset_description() { + description.str(string()); + } + + stringstream& set_description() { + return description; + } + stringstream& set_status() { + return status.set_status(); + } + + stringstream& set_status(const string& s) { + stringstream& status = set_status(); + status << s; + return status; + } + +public: + RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {} + ~RGWCoroutine() override; + + virtual int operate() = 0; + + bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); } + bool is_error() { return (state == RGWCoroutine_Error); } + + stringstream& log_error() { return error_stream; } + string error_str() { + return error_stream.str(); + } + + void set_retcode(int r) { + retcode = r; + } + + int get_ret_status() { + return retcode; + } + + void call(RGWCoroutine *op); /* call at the same stack we're in */ + RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ + bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ + bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */ + + int wait(const utime_t& interval); + bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */ + void wakeup(); + void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */ + + size_t num_spawned() { + return spawned.entries.size(); + } + + void wait_for_child(); + + virtual string to_str() const; + + RGWCoroutinesStack *get_stack() const { + return stack; + } + + void dump(Formatter *f) const; +}; + +ostream& operator<<(ostream& out, const RGWCoroutine& cr); + +#define yield_until_true(x) \ +do { \ + do { \ + yield _yield_ret = x; \ + } while (!_yield_ret); \ + _yield_ret = false; \ +} while (0) + +#define drain_all() \ + drain_cr = boost::asio::coroutine(); \ + yield_until_true(drain_children(0)) + +#define drain_all_but(n) \ + drain_cr = boost::asio::coroutine(); \ + yield_until_true(drain_children(n)) + +#define drain_all_but_stack(stack) \ + drain_cr = boost::asio::coroutine(); \ + yield_until_true(drain_children(1, stack)) + +template +class RGWConsumerCR : public RGWCoroutine { + list product; + +public: + RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {} + + bool has_product() { + return !product.empty(); + } + + void wait_for_product() { + if (!has_product()) { + set_sleeping(true); + } + } + + bool consume(T *p) { + if (product.empty()) { + return false; + } + *p = product.front(); + product.pop_front(); + return true; + } + + void receive(const T& p, bool wakeup = true); + void receive(list& l, bool wakeup = true); +}; + +class RGWCoroutinesStack : public RefCountedObject { + friend class RGWCoroutine; + friend class RGWCoroutinesManager; + + CephContext *cct; + + RGWCoroutinesManager *ops_mgr; + + list ops; + list::iterator pos; + + rgw_spawned_stacks spawned; + + set blocked_by_stack; + set blocking_stacks; + + bool done_flag; + bool error_flag; + bool blocked_flag; + bool sleep_flag; + bool interval_wait_flag; + + bool is_scheduled; + + bool is_waiting_for_child; + + int retcode; + + uint64_t run_count; + +protected: + RGWCoroutinesEnv *env; + RGWCoroutinesStack *parent; + + RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait); + bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ + bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */ +public: + RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL); + ~RGWCoroutinesStack() override; + + int operate(RGWCoroutinesEnv *env); + + bool is_done() { + return done_flag; + } + bool is_error() { + return error_flag; + } + bool is_blocked_by_stack() { + return !blocked_by_stack.empty(); + } + void set_io_blocked(bool flag) { + blocked_flag = flag; + } + bool is_io_blocked() { + return blocked_flag; + } + void set_interval_wait(bool flag) { + interval_wait_flag = flag; + } + bool is_interval_waiting() { + return interval_wait_flag; + } + void set_sleeping(bool flag) { + bool wakeup = sleep_flag & !flag; + sleep_flag = flag; + if (wakeup) { + schedule(); + } + } + bool is_sleeping() { + return sleep_flag; + } + void set_is_scheduled(bool flag) { + is_scheduled = flag; + } + + bool is_blocked() { + return is_blocked_by_stack() || is_sleeping() || + is_io_blocked() || waiting_for_child() ; + } + + void schedule(list *stacks = NULL) { + if (!stacks) { + stacks = env->scheduled_stacks; + } + if (!is_scheduled) { + stacks->push_back(this); + is_scheduled = true; + } + } + + int get_ret_status() { + return retcode; + } + + string error_str(); + + void call(RGWCoroutine *next_op); + RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait); + int unwind(int retcode); + + int wait(const utime_t& interval); + void wakeup(); + + bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ + + RGWAioCompletionNotifier *create_completion_notifier(); + RGWCompletionManager *get_completion_mgr(); + + void set_blocked_by(RGWCoroutinesStack *s) { + blocked_by_stack.insert(s); + s->blocking_stacks.insert(this); + } + + void set_wait_for_child(bool flag) { + is_waiting_for_child = flag; + } + + bool waiting_for_child() { + return is_waiting_for_child; + } + + bool unblock_stack(RGWCoroutinesStack **s); + + RGWCoroutinesEnv *get_env() { return env; } + + void dump(Formatter *f) const; +}; + +template +void RGWConsumerCR::receive(list& l, bool wakeup) +{ + product.splice(product.end(), l); + if (wakeup) { + set_sleeping(false); + } +} + + +template +void RGWConsumerCR::receive(const T& p, bool wakeup) +{ + product.push_back(p); + if (wakeup) { + set_sleeping(false); + } +} + +class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook { + CephContext *cct; + + set managers; + RWLock lock; + + string admin_command; + +public: + RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {} + ~RGWCoroutinesManagerRegistry() override; + + void add(RGWCoroutinesManager *mgr); + void remove(RGWCoroutinesManager *mgr); + + int hook_to_admin_command(const string& command); + bool call(std::string command, cmdmap_t& cmdmap, std::string format, + bufferlist& out) override; + + void dump(Formatter *f) const; +}; + +class RGWCoroutinesManager { + CephContext *cct; + std::atomic going_down = { false }; + + std::atomic run_context_count = { 0 }; + map > run_contexts; + + RWLock lock; + + void handle_unblocked_stack(set& context_stacks, list& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count); +protected: + RGWCompletionManager *completion_mgr; + RGWCoroutinesManagerRegistry *cr_registry; + + int ops_window; + + string id; + + void put_completion_notifier(RGWAioCompletionNotifier *cn); +public: + RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"), + cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) { + completion_mgr = new RGWCompletionManager(cct); + if (cr_registry) { + cr_registry->add(this); + } + } + virtual ~RGWCoroutinesManager() { + stop(); + completion_mgr->put(); + if (cr_registry) { + cr_registry->remove(this); + } + } + + int run(list& ops); + int run(RGWCoroutine *op); + void stop() { + bool expected = false; + if (going_down.compare_exchange_strong(expected, true)) { + completion_mgr->go_down(); + } + } + + virtual void report_error(RGWCoroutinesStack *op); + + RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack); + RGWCompletionManager *get_completion_mgr() { return completion_mgr; } + + void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack); + RGWCoroutinesStack *allocate_stack(); + + virtual string get_id(); + void dump(Formatter *f) const; +}; + +class RGWSimpleCoroutine : public RGWCoroutine { + bool called_cleanup; + + int operate() override; + + int state_init(); + int state_send_request(); + int state_request_complete(); + int state_all_complete(); + + void call_cleanup(); + +public: + RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {} + ~RGWSimpleCoroutine() override; + + virtual int init() { return 0; } + virtual int send_request() = 0; + virtual int request_complete() = 0; + virtual int finish() { return 0; } + virtual void request_cleanup() {} +}; + +#endif