#ifndef CEPH_RGW_COROUTINE_H #define CEPH_RGW_COROUTINE_H #ifdef _ASSERT_H #define NEED_ASSERT_H #pragma push_macro("_ASSERT_H") #endif #include #include #ifdef NEED_ASSERT_H #pragma pop_macro("_ASSERT_H") #endif #include "include/utime.h" #include "common/RefCountedObj.h" #include "common/debug.h" #include "common/Timer.h" #include "common/admin_socket.h" #include "rgw_common.h" #include #include #define RGW_ASYNC_OPS_MGR_WINDOW 100 class RGWCoroutinesStack; class RGWCoroutinesManager; class RGWAioCompletionNotifier; class RGWCompletionManager : public RefCountedObject { CephContext *cct; list complete_reqs; using NotifierRef = boost::intrusive_ptr; set cns; Mutex lock; Cond cond; SafeTimer timer; std::atomic going_down = { false }; map waiters; class WaitContext; protected: void _wakeup(void *opaque); void _complete(RGWAioCompletionNotifier *cn, void *user_info); public: RGWCompletionManager(CephContext *_cct); ~RGWCompletionManager() override; void complete(RGWAioCompletionNotifier *cn, void *user_info); int get_next(void **user_info); bool try_get_next(void **user_info); void go_down(); /* * wait for interval length to complete user_info */ void wait_interval(void *opaque, const utime_t& interval, void *user_info); void wakeup(void *opaque); void register_completion_notifier(RGWAioCompletionNotifier *cn); void unregister_completion_notifier(RGWAioCompletionNotifier *cn); }; /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */ class RGWAioCompletionNotifier : public RefCountedObject { librados::AioCompletion *c; RGWCompletionManager *completion_mgr; void *user_data; Mutex lock; bool registered; public: RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data); ~RGWAioCompletionNotifier() override { c->release(); lock.Lock(); bool need_unregister = registered; if (registered) { completion_mgr->get(); } registered = false; lock.Unlock(); if (need_unregister) { completion_mgr->unregister_completion_notifier(this); completion_mgr->put(); } } librados::AioCompletion *completion() { return c; } void unregister() { Mutex::Locker l(lock); if (!registered) { return; } registered = false; } void cb() { lock.Lock(); if (!registered) { lock.Unlock(); put(); return; } completion_mgr->get(); registered = false; lock.Unlock(); completion_mgr->complete(this, user_data); completion_mgr->put(); put(); } }; struct RGWCoroutinesEnv { uint64_t run_context; RGWCoroutinesManager *manager; list *scheduled_stacks; RGWCoroutinesStack *stack; RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {} }; enum RGWCoroutineState { RGWCoroutine_Error = -2, RGWCoroutine_Done = -1, RGWCoroutine_Run = 0, }; struct rgw_spawned_stacks { vector entries; rgw_spawned_stacks() {} void add_pending(RGWCoroutinesStack *s) { entries.push_back(s); } void inherit(rgw_spawned_stacks *source) { for (vector::iterator iter = source->entries.begin(); iter != source->entries.end(); ++iter) { add_pending(*iter); } source->entries.clear(); } }; class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine { friend class RGWCoroutinesStack; struct StatusItem { utime_t timestamp; string status; StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {} void dump(Formatter *f) const; }; #define MAX_COROUTINE_HISTORY 10 struct Status { CephContext *cct; RWLock lock; int max_history; utime_t timestamp; stringstream status; Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {} deque history; stringstream& set_status(); } status; stringstream description; protected: bool _yield_ret; boost::asio::coroutine drain_cr; CephContext *cct; RGWCoroutinesStack *stack; int retcode; int state; rgw_spawned_stacks spawned; stringstream error_stream; int set_state(int s, int ret = 0) { state = s; return ret; } int set_cr_error(int ret) { state = RGWCoroutine_Error; return ret; } int set_cr_done() { state = RGWCoroutine_Done; return 0; } void set_io_blocked(bool flag); int io_block(int ret = 0); void reset_description() { description.str(string()); } stringstream& set_description() { return description; } stringstream& set_status() { return status.set_status(); } stringstream& set_status(const string& s) { stringstream& status = set_status(); status << s; return status; } public: RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {} ~RGWCoroutine() override; virtual int operate() = 0; bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); } bool is_error() { return (state == RGWCoroutine_Error); } stringstream& log_error() { return error_stream; } string error_str() { return error_stream.str(); } void set_retcode(int r) { retcode = r; } int get_ret_status() { return retcode; } void call(RGWCoroutine *op); /* call at the same stack we're in */ RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */ int wait(const utime_t& interval); bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */ void wakeup(); void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */ size_t num_spawned() { return spawned.entries.size(); } void wait_for_child(); virtual string to_str() const; RGWCoroutinesStack *get_stack() const { return stack; } void dump(Formatter *f) const; }; ostream& operator<<(ostream& out, const RGWCoroutine& cr); #define yield_until_true(x) \ do { \ do { \ yield _yield_ret = x; \ } while (!_yield_ret); \ _yield_ret = false; \ } while (0) #define drain_all() \ drain_cr = boost::asio::coroutine(); \ yield_until_true(drain_children(0)) #define drain_all_but(n) \ drain_cr = boost::asio::coroutine(); \ yield_until_true(drain_children(n)) #define drain_all_but_stack(stack) \ drain_cr = boost::asio::coroutine(); \ yield_until_true(drain_children(1, stack)) template class RGWConsumerCR : public RGWCoroutine { list product; public: RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {} bool has_product() { return !product.empty(); } void wait_for_product() { if (!has_product()) { set_sleeping(true); } } bool consume(T *p) { if (product.empty()) { return false; } *p = product.front(); product.pop_front(); return true; } void receive(const T& p, bool wakeup = true); void receive(list& l, bool wakeup = true); }; class RGWCoroutinesStack : public RefCountedObject { friend class RGWCoroutine; friend class RGWCoroutinesManager; CephContext *cct; RGWCoroutinesManager *ops_mgr; list ops; list::iterator pos; rgw_spawned_stacks spawned; set blocked_by_stack; set blocking_stacks; bool done_flag; bool error_flag; bool blocked_flag; bool sleep_flag; bool interval_wait_flag; bool is_scheduled; bool is_waiting_for_child; int retcode; uint64_t run_count; protected: RGWCoroutinesEnv *env; RGWCoroutinesStack *parent; RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait); bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */ public: RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL); ~RGWCoroutinesStack() override; int operate(RGWCoroutinesEnv *env); bool is_done() { return done_flag; } bool is_error() { return error_flag; } bool is_blocked_by_stack() { return !blocked_by_stack.empty(); } void set_io_blocked(bool flag) { blocked_flag = flag; } bool is_io_blocked() { return blocked_flag; } void set_interval_wait(bool flag) { interval_wait_flag = flag; } bool is_interval_waiting() { return interval_wait_flag; } void set_sleeping(bool flag) { bool wakeup = sleep_flag & !flag; sleep_flag = flag; if (wakeup) { schedule(); } } bool is_sleeping() { return sleep_flag; } void set_is_scheduled(bool flag) { is_scheduled = flag; } bool is_blocked() { return is_blocked_by_stack() || is_sleeping() || is_io_blocked() || waiting_for_child() ; } void schedule(list *stacks = NULL) { if (!stacks) { stacks = env->scheduled_stacks; } if (!is_scheduled) { stacks->push_back(this); is_scheduled = true; } } int get_ret_status() { return retcode; } string error_str(); void call(RGWCoroutine *next_op); RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait); int unwind(int retcode); int wait(const utime_t& interval); void wakeup(); bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ RGWAioCompletionNotifier *create_completion_notifier(); RGWCompletionManager *get_completion_mgr(); void set_blocked_by(RGWCoroutinesStack *s) { blocked_by_stack.insert(s); s->blocking_stacks.insert(this); } void set_wait_for_child(bool flag) { is_waiting_for_child = flag; } bool waiting_for_child() { return is_waiting_for_child; } bool unblock_stack(RGWCoroutinesStack **s); RGWCoroutinesEnv *get_env() { return env; } void dump(Formatter *f) const; }; template void RGWConsumerCR::receive(list& l, bool wakeup) { product.splice(product.end(), l); if (wakeup) { set_sleeping(false); } } template void RGWConsumerCR::receive(const T& p, bool wakeup) { product.push_back(p); if (wakeup) { set_sleeping(false); } } class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook { CephContext *cct; set managers; RWLock lock; string admin_command; public: RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {} ~RGWCoroutinesManagerRegistry() override; void add(RGWCoroutinesManager *mgr); void remove(RGWCoroutinesManager *mgr); int hook_to_admin_command(const string& command); bool call(std::string command, cmdmap_t& cmdmap, std::string format, bufferlist& out) override; void dump(Formatter *f) const; }; class RGWCoroutinesManager { CephContext *cct; std::atomic going_down = { false }; std::atomic run_context_count = { 0 }; map > run_contexts; RWLock lock; void handle_unblocked_stack(set& context_stacks, list& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count); protected: RGWCompletionManager *completion_mgr; RGWCoroutinesManagerRegistry *cr_registry; int ops_window; string id; void put_completion_notifier(RGWAioCompletionNotifier *cn); public: RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"), cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) { completion_mgr = new RGWCompletionManager(cct); if (cr_registry) { cr_registry->add(this); } } virtual ~RGWCoroutinesManager() { stop(); completion_mgr->put(); if (cr_registry) { cr_registry->remove(this); } } int run(list& ops); int run(RGWCoroutine *op); void stop() { bool expected = false; if (going_down.compare_exchange_strong(expected, true)) { completion_mgr->go_down(); } } virtual void report_error(RGWCoroutinesStack *op); RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack); RGWCompletionManager *get_completion_mgr() { return completion_mgr; } void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack); RGWCoroutinesStack *allocate_stack(); virtual string get_id(); void dump(Formatter *f) const; }; class RGWSimpleCoroutine : public RGWCoroutine { bool called_cleanup; int operate() override; int state_init(); int state_send_request(); int state_request_complete(); int state_all_complete(); void call_cleanup(); public: RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {} ~RGWSimpleCoroutine() override; virtual int init() { return 0; } virtual int send_request() = 0; virtual int request_complete() = 0; virtual int finish() { return 0; } virtual void request_cleanup() {} }; #endif